Skip to content

Commit

Permalink
Merge pull request #3934 from bjester/fix-blaines-mistakes
Browse files Browse the repository at this point in the history
Fix production task-related issues and issue logging out
  • Loading branch information
rtibbles authored Feb 2, 2023
2 parents 3d89859 + 6c29020 commit 15d1bb6
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ const debouncedSyncChanges = debounce(() => {
if (!syncActive) {
return syncChanges();
}
// TODO: actually return promise that resolves when active sync completes
return new Promise(resolve => setTimeout(resolve, 1000));
}, SYNC_IF_NO_CHANGES_FOR * 1000);

if (process.env.NODE_ENV !== 'production' && typeof window !== 'undefined') {
Expand Down Expand Up @@ -391,6 +393,9 @@ export function stopSyncing() {
db.on('changes').unsubscribe(handleChanges);
}

/**
* @return {Promise}
*/
export function forceServerSync() {
debouncedSyncChanges();
return debouncedSyncChanges.flush();
Expand Down
18 changes: 0 additions & 18 deletions contentcuration/contentcuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
from contentcuration.db.models.manager import CustomManager
from contentcuration.statistics import record_channel_stats
from contentcuration.utils.cache import delete_public_channel_cache_keys
from contentcuration.utils.celery.tasks import generate_task_signature
from contentcuration.utils.parser import load_json_string
from contentcuration.viewsets.sync.constants import ALL_CHANGES
from contentcuration.viewsets.sync.constants import ALL_TABLES
Expand Down Expand Up @@ -2451,7 +2450,6 @@ class TaskResultCustom(object):
signature = models.CharField(null=True, blank=False, max_length=32)

super_as_dict = TaskResult.as_dict
super_save = TaskResult.save

def as_dict(self):
"""
Expand All @@ -2465,22 +2463,6 @@ def as_dict(self):
)
return super_dict

def set_signature(self):
"""
Generates and sets the signature for the task if it isn't set
"""
if self.signature is not None:
# nothing to do
return
self.signature = generate_task_signature(self.task_name, task_kwargs=self.task_kwargs, channel_id=self.channel_id)

def save(self, *args, **kwargs):
"""
Override save to ensure signature is generated
"""
self.set_signature()
return self.super_save(*args, **kwargs)

@classmethod
def contribute_to_class(cls, model_class=TaskResult):
"""
Expand Down
2 changes: 1 addition & 1 deletion contentcuration/contentcuration/tests/test_asynctask.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def test_fetch_or_enqueue_task__channel_id__uuid_then_hex(self):
self.assertEqual(expected_task.task_id, async_result.task_id)

def test_requeue_task(self):
signature = requeue_test_task._generate_signature({})
signature = requeue_test_task.generate_signature({})
existing_task_ids = requeue_test_task.find_ids(signature)
self.assertEqual(len(existing_task_ids), 0)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,39 @@
from __future__ import absolute_import

import datetime
import json

import pytz
from django.conf import settings
from django.core.cache import cache
from django.urls import reverse
from mock import Mock
from mock import patch
from rest_framework.reverse import reverse

from .base import BaseAPITestCase
from .testdata import tree
from contentcuration.models import Channel
from contentcuration.tasks import generatenodediff_task
from contentcuration.tests.base import BaseAPITestCase


class NodesViewsTestCase(BaseAPITestCase):
def test_get_node_diff__missing_contentnode(self):
response = self.get(reverse("get_node_diff", kwargs=dict(updated_id="abc123", original_id="def456")))
self.assertEqual(response.status_code, 404)

def test_get_node_diff__no_task_processing(self):
pk = self.channel.main_tree.pk
response = self.get(reverse("get_node_diff", kwargs=dict(updated_id=pk, original_id=pk)))
self.assertEqual(response.status_code, 404)

@patch.object(generatenodediff_task, 'find_incomplete_ids')
def test_get_node_diff__task_processing(self, mock_find_incomplete_ids):
qs = Mock(spec="django.db.models.query.QuerySet")
mock_find_incomplete_ids.return_value = qs()
mock_find_incomplete_ids.return_value.exists.return_value = True

pk = self.channel.main_tree.pk
response = self.get(reverse("get_node_diff", kwargs=dict(updated_id=pk, original_id=pk)))
self.assertEqual(response.status_code, 302)

class NodeViewsUtilityTestCase(BaseAPITestCase):
def test_get_channel_details(self):
url = reverse('get_channel_details', [self.channel.id])
url = reverse('get_channel_details', kwargs={"channel_id": self.channel.id})
response = self.get(url)

details = json.loads(response.content)
Expand All @@ -33,24 +50,22 @@ def test_get_channel_details_cached(self):
cache.set(cache_key, json.dumps(data))

with patch("contentcuration.views.nodes.getnodedetails_task") as task_mock:
url = reverse('get_channel_details', [self.channel.id])
url = reverse('get_channel_details', kwargs={"channel_id": self.channel.id})
self.get(url)
# Check that the outdated cache prompts an asynchronous cache update
task_mock.enqueue.assert_called_once_with(self.user, node_id=self.channel.main_tree.id)


class GetTopicDetailsEndpointTestCase(BaseAPITestCase):
class ChannelDetailsEndpointTestCase(BaseAPITestCase):
def test_200_post(self):
response = self.get(
reverse("get_channel_details", kwargs={"channel_id": self.channel.id})
)
self.assertEqual(response.status_code, 200)

def test_404_no_permission(self):
new_channel = Channel.objects.create()
new_channel.main_tree = tree()
new_channel.save()
self.channel.editors.remove(self.user)
response = self.get(
reverse("get_channel_details", kwargs={"channel_id": new_channel.id}),
reverse("get_channel_details", kwargs={"channel_id": self.channel.id}),
)
self.assertEqual(response.status_code, 404)
37 changes: 24 additions & 13 deletions contentcuration/contentcuration/utils/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from celery.app.task import Task
from celery.result import AsyncResult
from django.db import transaction
from django.db.utils import IntegrityError

from contentcuration.constants.locking import TASK_LOCK
from contentcuration.db.advisory_lock import advisory_lock
Expand Down Expand Up @@ -139,7 +140,7 @@ def _prepare_kwargs(self, kwargs):
for key, value in kwargs.items()
)

def _generate_signature(self, kwargs):
def generate_signature(self, kwargs):
"""
:param kwargs: A dictionary of task kwargs
:return: An hex string representing an md5 hash of task metadata
Expand Down Expand Up @@ -207,7 +208,7 @@ def enqueue(self, user, **kwargs):

signature = kwargs.pop('signature', None)
if signature is None:
signature = self._generate_signature(kwargs)
signature = self.generate_signature(kwargs)

task_id = uuid.uuid4().hex
prepared_kwargs = self._prepare_kwargs(kwargs)
Expand All @@ -224,14 +225,24 @@ def enqueue(self, user, **kwargs):
# ensure the result is saved to the backend (database)
self.backend.add_pending_result(async_result)

# after calling apply, we should have task result model, so get it and set our custom fields
task_result = get_task_model(self, task_id)
task_result.task_name = self.name
task_result.task_kwargs = self.backend.encode(prepared_kwargs)
task_result.user = user
task_result.channel_id = channel_id
task_result.signature = signature
task_result.save()
saved = False
tries = 0
while not saved:
# after calling apply, we should ideally have a task result model saved to the DB, but it relies on celery's
# event consumption, and we might try to retrieve it before it has actually saved, so we retry
try:
task_result = get_task_model(self, task_id)
task_result.task_name = self.name
task_result.task_kwargs = self.backend.encode(prepared_kwargs)
task_result.user = user
task_result.channel_id = channel_id
task_result.signature = signature
task_result.save()
saved = True
except IntegrityError as e:
tries += 1
if tries > 3:
raise e
return async_result

def fetch_or_enqueue(self, user, **kwargs):
Expand All @@ -249,7 +260,7 @@ def fetch_or_enqueue(self, user, **kwargs):
if self.app.conf.task_always_eager:
return self.enqueue(user, **kwargs)

signature = self._generate_signature(kwargs)
signature = self.generate_signature(kwargs)

# create an advisory lock to obtain exclusive control on preventing task duplicates
with self._lock_signature(signature):
Expand Down Expand Up @@ -278,7 +289,7 @@ def requeue(self, **kwargs):
task_result = get_task_model(self, request.id)
task_kwargs = request.kwargs.copy()
task_kwargs.update(kwargs)
signature = self._generate_signature(kwargs)
signature = self.generate_signature(kwargs)
logging.info(f"Re-queuing task {self.name} for user {task_result.user.pk} from {request.id} | {signature}")
return self.enqueue(task_result.user, signature=signature, **task_kwargs)

Expand All @@ -289,7 +300,7 @@ def revoke(self, exclude_task_ids=None, **kwargs):
:param kwargs: Task keyword arguments that will be used to match against tasks
:return: The number of tasks revoked
"""
signature = self._generate_signature(kwargs)
signature = self.generate_signature(kwargs)
task_ids = self.find_incomplete_ids(signature)

if exclude_task_ids is not None:
Expand Down
3 changes: 2 additions & 1 deletion contentcuration/contentcuration/views/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ def get_node_diff(request, updated_id, original_id):
if data:
return Response(data)

signature = generatenodediff_task.generate_signature(dict(updated_id=updated_id, original_id=original_id))
# See if there's already a staging task in progress
if generatenodediff_task.find_incomplete_ids(updated_id=updated_id, original_id=original_id).exists():
if generatenodediff_task.find_incomplete_ids(signature).exists():
return Response('Diff is being generated', status=status.HTTP_302_FOUND)
except ContentNode.DoesNotExist:
pass
Expand Down

0 comments on commit 15d1bb6

Please sign in to comment.