Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge down hotfixes into unstable #4001

Merged
merged 5 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ const debouncedSyncChanges = debounce(() => {
if (!syncActive) {
return syncChanges();
}
// TODO: actually return promise that resolves when active sync completes
return new Promise(resolve => setTimeout(resolve, 1000));
}, SYNC_IF_NO_CHANGES_FOR * 1000);

if (process.env.NODE_ENV !== 'production' && typeof window !== 'undefined') {
Expand Down Expand Up @@ -391,6 +393,9 @@ export function stopSyncing() {
db.on('changes').unsubscribe(handleChanges);
}

/**
* @return {Promise}
*/
export function forceServerSync() {
debouncedSyncChanges();
return debouncedSyncChanges.flush();
Expand Down
18 changes: 0 additions & 18 deletions contentcuration/contentcuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
from contentcuration.db.models.manager import CustomManager
from contentcuration.statistics import record_channel_stats
from contentcuration.utils.cache import delete_public_channel_cache_keys
from contentcuration.utils.celery.tasks import generate_task_signature
from contentcuration.utils.parser import load_json_string
from contentcuration.viewsets.sync.constants import ALL_CHANGES
from contentcuration.viewsets.sync.constants import ALL_TABLES
Expand Down Expand Up @@ -2451,7 +2450,6 @@ class TaskResultCustom(object):
signature = models.CharField(null=True, blank=False, max_length=32)

super_as_dict = TaskResult.as_dict
super_save = TaskResult.save

def as_dict(self):
"""
Expand All @@ -2465,22 +2463,6 @@ def as_dict(self):
)
return super_dict

def set_signature(self):
"""
Generates and sets the signature for the task if it isn't set
"""
if self.signature is not None:
# nothing to do
return
self.signature = generate_task_signature(self.task_name, task_kwargs=self.task_kwargs, channel_id=self.channel_id)

def save(self, *args, **kwargs):
"""
Override save to ensure signature is generated
"""
self.set_signature()
return self.super_save(*args, **kwargs)

@classmethod
def contribute_to_class(cls, model_class=TaskResult):
"""
Expand Down
2 changes: 1 addition & 1 deletion contentcuration/contentcuration/tests/test_asynctask.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def test_fetch_or_enqueue_task__channel_id__uuid_then_hex(self):
self.assertEqual(expected_task.task_id, async_result.task_id)

def test_requeue_task(self):
signature = requeue_test_task._generate_signature({})
signature = requeue_test_task.generate_signature({})
existing_task_ids = requeue_test_task.find_ids(signature)
self.assertEqual(len(existing_task_ids), 0)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,39 @@
from __future__ import absolute_import

import datetime
import json

import pytz
from django.conf import settings
from django.core.cache import cache
from django.urls import reverse
from mock import Mock
from mock import patch
from rest_framework.reverse import reverse

from .base import BaseAPITestCase
from .testdata import tree
from contentcuration.models import Channel
from contentcuration.tasks import generatenodediff_task
from contentcuration.tests.base import BaseAPITestCase


class NodesViewsTestCase(BaseAPITestCase):
def test_get_node_diff__missing_contentnode(self):
response = self.get(reverse("get_node_diff", kwargs=dict(updated_id="abc123", original_id="def456")))
self.assertEqual(response.status_code, 404)

def test_get_node_diff__no_task_processing(self):
pk = self.channel.main_tree.pk
response = self.get(reverse("get_node_diff", kwargs=dict(updated_id=pk, original_id=pk)))
self.assertEqual(response.status_code, 404)

@patch.object(generatenodediff_task, 'find_incomplete_ids')
def test_get_node_diff__task_processing(self, mock_find_incomplete_ids):
qs = Mock(spec="django.db.models.query.QuerySet")
mock_find_incomplete_ids.return_value = qs()
mock_find_incomplete_ids.return_value.exists.return_value = True

pk = self.channel.main_tree.pk
response = self.get(reverse("get_node_diff", kwargs=dict(updated_id=pk, original_id=pk)))
self.assertEqual(response.status_code, 302)

class NodeViewsUtilityTestCase(BaseAPITestCase):
def test_get_channel_details(self):
url = reverse('get_channel_details', [self.channel.id])
url = reverse('get_channel_details', kwargs={"channel_id": self.channel.id})
response = self.get(url)

details = json.loads(response.content)
Expand All @@ -33,24 +50,22 @@ def test_get_channel_details_cached(self):
cache.set(cache_key, json.dumps(data))

with patch("contentcuration.views.nodes.getnodedetails_task") as task_mock:
url = reverse('get_channel_details', [self.channel.id])
url = reverse('get_channel_details', kwargs={"channel_id": self.channel.id})
self.get(url)
# Check that the outdated cache prompts an asynchronous cache update
task_mock.enqueue.assert_called_once_with(self.user, node_id=self.channel.main_tree.id)


class GetTopicDetailsEndpointTestCase(BaseAPITestCase):
class ChannelDetailsEndpointTestCase(BaseAPITestCase):
def test_200_post(self):
response = self.get(
reverse("get_channel_details", kwargs={"channel_id": self.channel.id})
)
self.assertEqual(response.status_code, 200)

def test_404_no_permission(self):
new_channel = Channel.objects.create()
new_channel.main_tree = tree()
new_channel.save()
self.channel.editors.remove(self.user)
response = self.get(
reverse("get_channel_details", kwargs={"channel_id": new_channel.id}),
reverse("get_channel_details", kwargs={"channel_id": self.channel.id}),
)
self.assertEqual(response.status_code, 404)
37 changes: 24 additions & 13 deletions contentcuration/contentcuration/utils/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from celery.app.task import Task
from celery.result import AsyncResult
from django.db import transaction
from django.db.utils import IntegrityError

from contentcuration.constants.locking import TASK_LOCK
from contentcuration.db.advisory_lock import advisory_lock
Expand Down Expand Up @@ -139,7 +140,7 @@ def _prepare_kwargs(self, kwargs):
for key, value in kwargs.items()
)

def _generate_signature(self, kwargs):
def generate_signature(self, kwargs):
"""
:param kwargs: A dictionary of task kwargs
:return: An hex string representing an md5 hash of task metadata
Expand Down Expand Up @@ -207,7 +208,7 @@ def enqueue(self, user, **kwargs):

signature = kwargs.pop('signature', None)
if signature is None:
signature = self._generate_signature(kwargs)
signature = self.generate_signature(kwargs)

task_id = uuid.uuid4().hex
prepared_kwargs = self._prepare_kwargs(kwargs)
Expand All @@ -224,14 +225,24 @@ def enqueue(self, user, **kwargs):
# ensure the result is saved to the backend (database)
self.backend.add_pending_result(async_result)

# after calling apply, we should have task result model, so get it and set our custom fields
task_result = get_task_model(self, task_id)
task_result.task_name = self.name
task_result.task_kwargs = self.backend.encode(prepared_kwargs)
task_result.user = user
task_result.channel_id = channel_id
task_result.signature = signature
task_result.save()
saved = False
tries = 0
while not saved:
# after calling apply, we should ideally have a task result model saved to the DB, but it relies on celery's
# event consumption, and we might try to retrieve it before it has actually saved, so we retry
try:
task_result = get_task_model(self, task_id)
task_result.task_name = self.name
task_result.task_kwargs = self.backend.encode(prepared_kwargs)
task_result.user = user
task_result.channel_id = channel_id
task_result.signature = signature
task_result.save()
saved = True
except IntegrityError as e:
tries += 1
if tries > 3:
raise e
return async_result

def fetch_or_enqueue(self, user, **kwargs):
Expand All @@ -249,7 +260,7 @@ def fetch_or_enqueue(self, user, **kwargs):
if self.app.conf.task_always_eager:
return self.enqueue(user, **kwargs)

signature = self._generate_signature(kwargs)
signature = self.generate_signature(kwargs)

# create an advisory lock to obtain exclusive control on preventing task duplicates
with self._lock_signature(signature):
Expand Down Expand Up @@ -278,7 +289,7 @@ def requeue(self, **kwargs):
task_result = get_task_model(self, request.id)
task_kwargs = request.kwargs.copy()
task_kwargs.update(kwargs)
signature = self._generate_signature(kwargs)
signature = self.generate_signature(kwargs)
logging.info(f"Re-queuing task {self.name} for user {task_result.user.pk} from {request.id} | {signature}")
return self.enqueue(task_result.user, signature=signature, **task_kwargs)

Expand All @@ -289,7 +300,7 @@ def revoke(self, exclude_task_ids=None, **kwargs):
:param kwargs: Task keyword arguments that will be used to match against tasks
:return: The number of tasks revoked
"""
signature = self._generate_signature(kwargs)
signature = self.generate_signature(kwargs)
task_ids = self.find_incomplete_ids(signature)

if exclude_task_ids is not None:
Expand Down
3 changes: 2 additions & 1 deletion contentcuration/contentcuration/views/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ def get_node_diff(request, updated_id, original_id):
if data:
return Response(data)

signature = generatenodediff_task.generate_signature(dict(updated_id=updated_id, original_id=original_id))
# See if there's already a staging task in progress
if generatenodediff_task.find_incomplete_ids(updated_id=updated_id, original_id=original_id).exists():
if generatenodediff_task.find_incomplete_ids(signature).exists():
return Response('Diff is being generated', status=status.HTTP_302_FOUND)
except ContentNode.DoesNotExist:
pass
Expand Down