Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc fixes for LOD syncing and upgrade Morango #11525

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion kolibri/core/auth/kolibri_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def handle_initial(self, context):
"""
:type context: morango.sync.context.LocalSessionContext
"""
if context.is_receiver:
from kolibri.core.device.utils import device_provisioned

if context.is_receiver and device_provisioned():
is_pull = context.is_pull
is_push = context.is_push
sync_filter = str(context.filter)
Expand Down
47 changes: 26 additions & 21 deletions kolibri/core/auth/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from kolibri.core.serializers import HexOnlyUUIDField
from kolibri.core.tasks.decorators import register_task
from kolibri.core.tasks.exceptions import JobNotFound
from kolibri.core.tasks.exceptions import JobRunning
from kolibri.core.tasks.job import JobStatus
from kolibri.core.tasks.job import Priority
from kolibri.core.tasks.job import State
Expand Down Expand Up @@ -424,6 +425,7 @@ def validate(self, data):
queue=soud_sync_queue,
priority=Priority.HIGH,
status_fn=status_fn,
long_running=True,
)
def soud_sync_processing():
# run processing
Expand All @@ -433,37 +435,39 @@ def soud_sync_processing():
if next_run is not None:
job = get_current_job()
job.retry_in(next_run)
else:
logger.info("Skipping enqueue of SoUD sync processing: no attempts remaining")


def enqueue_soud_sync_processing(force=False):
def enqueue_soud_sync_processing():
"""
Enqueue a task to process SoUD syncs, if necessary
"""
next_run = soud.get_time_to_next_attempt()
if next_run is None:
# No need to enqueue, as there is no next run
logger.info("Skipping enqueue of SoUD sync processing: no eligible syncs")
return

if force:
job_storage.cancel_if_exists(SOUD_SYNC_PROCESSING_JOB_ID)
else:
# Check if there is already an enqueued job
try:
converted_next_run = naive_utc_datetime(timezone.now() + next_run)
orm_job = job_storage.get_orm_job(SOUD_SYNC_PROCESSING_JOB_ID)
if (
orm_job.state == State.RUNNING
or orm_job.state == State.QUEUED
and orm_job.scheduled_time <= converted_next_run
):
# Already queued sooner or at the same time as the next run
return
# Otherwise, cancel the existing job, and re-enqueue
job_storage.cancel_if_exists(SOUD_SYNC_PROCESSING_JOB_ID)
except JobNotFound:
pass

soud_sync_processing.enqueue_in(next_run)
# Check if there is already an enqueued job
try:
converted_next_run = naive_utc_datetime(timezone.now() + next_run)
orm_job = job_storage.get_orm_job(SOUD_SYNC_PROCESSING_JOB_ID)
if (
orm_job.state not in (State.COMPLETED, State.FAILED, State.CANCELED)
and orm_job.scheduled_time <= converted_next_run
):
# Already queued sooner or at the same time as the next run
logger.info("Skipping enqueue of SoUD sync processing: scheduled sooner")
return
except JobNotFound:
pass

logger.info("Enqueuing SoUD sync processing in {}".format(next_run))
try:
soud_sync_processing.enqueue_in(next_run)
except JobRunning:
logger.info("Skipping enqueue of SoUD sync processing: already running")


@register_task(
Expand Down Expand Up @@ -578,6 +582,7 @@ def validate(self, data):
queue=soud_sync_queue,
permission_classes=[IsSuperAdmin() | NotProvisioned()],
status_fn=status_fn,
long_running=True,
)
def peeruserimport(command, **kwargs):
call_command(command, **kwargs)
Expand Down
66 changes: 35 additions & 31 deletions kolibri/core/auth/test/test_auth_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from kolibri.core.discovery.models import NetworkLocation
from kolibri.core.discovery.utils.network.errors import NetworkLocationNotFound
from kolibri.core.discovery.utils.network.errors import ResourceGoneError
from kolibri.core.tasks.exceptions import JobRunning
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State
from kolibri.utils.time_utils import naive_utc_datetime
Expand Down Expand Up @@ -692,31 +693,6 @@ def test_validate_and_create_sync_credentials_no_credentials(


class SoudTasksTestCase(TestCase):
@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
def test_enqueue_soud_sync_processing__none__forced(self, mock_soud, mock_task):
mock_soud.get_time_to_next_attempt.return_value = None
enqueue_soud_sync_processing(force=True)
mock_task.enqueue_in.assert_not_called()

@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
def test_enqueue_soud_sync_processing__now__forced(self, mock_soud, mock_task):
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=0)
enqueue_soud_sync_processing(force=True)
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=0))

@patch("kolibri.core.auth.tasks.job_storage")
@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
def test_enqueue_soud_sync_processing__future__forced(
self, mock_soud, mock_task, mock_job_storage
):
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=15)
enqueue_soud_sync_processing(force=True)
mock_job_storage.cancel_if_exists.assert_called_once_with("50")
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=15))

@patch("kolibri.core.auth.tasks.job_storage")
@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
Expand All @@ -728,7 +704,6 @@ def test_enqueue_soud_sync_processing__future__scheduled(
mock_job.state = State.QUEUED
mock_job.scheduled_time = naive_utc_datetime(timezone.now())
enqueue_soud_sync_processing()
mock_job_storage.cancel_if_exists.assert_not_called()
mock_task.enqueue_in.assert_not_called()

@patch("kolibri.core.auth.tasks.job_storage")
Expand All @@ -737,14 +712,11 @@ def test_enqueue_soud_sync_processing__future__scheduled(
def test_enqueue_soud_sync_processing__future__running(
self, mock_soud, mock_task, mock_job_storage
):
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(
seconds=-10
)
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=1)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.RUNNING
mock_job.scheduled_time = naive_utc_datetime(timezone.now())
enqueue_soud_sync_processing()
mock_job_storage.cancel_if_exists.assert_not_called()
mock_task.enqueue_in.assert_not_called()

@patch("kolibri.core.auth.tasks.job_storage")
Expand All @@ -760,7 +732,39 @@ def test_enqueue_soud_sync_processing__future__reschedule(
timezone.now() + datetime.timedelta(seconds=15)
)
enqueue_soud_sync_processing()
mock_job_storage.cancel_if_exists.assert_called_once_with("50")
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))

@patch("kolibri.core.auth.tasks.job_storage")
@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
def test_enqueue_soud_sync_processing__completed__enqueue(
self, mock_soud, mock_task, mock_job_storage
):
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=10)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.COMPLETED
# far in the past
mock_job.scheduled_time = naive_utc_datetime(
timezone.now() - datetime.timedelta(seconds=100)
)
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))

@patch("kolibri.core.auth.tasks.job_storage")
@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
def test_enqueue_soud_sync_processing__race__already_running(
self, mock_soud, mock_task, mock_job_storage
):
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=10)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.COMPLETED
# far in the past
mock_job.scheduled_time = naive_utc_datetime(
timezone.now() - datetime.timedelta(seconds=100)
)
mock_task.enqueue_in.side_effect = JobRunning()
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))

@patch("kolibri.core.auth.tasks.get_current_job")
Expand Down
2 changes: 2 additions & 0 deletions kolibri/core/auth/test/test_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from django.test import TestCase
from morango.sync.context import LocalSessionContext

from .helpers import provision_device
from kolibri.core.auth.kolibri_plugin import AuthSyncHook
from kolibri.core.auth.kolibri_plugin import CleanUpTaskOperation


@mock.patch("kolibri.core.auth.kolibri_plugin.cleanupsync")
class CleanUpTaskOperationTestCase(TestCase):
def setUp(self):
provision_device()
self.context = mock.MagicMock(
spec=LocalSessionContext(),
filter=uuid.uuid4().hex,
Expand Down
17 changes: 15 additions & 2 deletions kolibri/core/device/soud.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def get_time_to_next_attempt():
)
if attempt_at is None:
return None
return datetime.timedelta(seconds=attempt_at - time.time())
return datetime.timedelta(seconds=max(attempt_at - time.time(), 0))


def attempt_execute_window():
Expand All @@ -332,6 +332,15 @@ def execute_syncs():
"""
Core SoUD sync processing logic that processes any syncs that
"""
# since there should only ever be one processing job running at a time, if we encounter any in
# the queue that are marked as syncing, we should reset their status to pending because it must
# mean that the previous job was terminated unexpectedly
SyncQueue.objects.filter(status=SyncQueueStatus.Syncing,).update(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

status=SyncQueueStatus.Pending,
updated=time.time(),
keep_alive=0,
)

base_qs = (
get_eligible_syncs()
.order_by("attempt_at")
Expand Down Expand Up @@ -376,6 +385,10 @@ def execute_sync(context):
sync_queue.save()

try:
# context filters the network location to only those marked available
if not context.network_location:
raise NetworkLocation.DoesNotExist

call_command(
command,
user=context.user_id,
Expand All @@ -386,10 +399,10 @@ def execute_sync(context):
**resume_kwargs
)
except NetworkLocation.DoesNotExist:
# network location may have become unavailable
cleanup = True
logger.debug("{} Network location unavailable".format(context))
sync_queue.status = SyncQueueStatus.Pending
sync_queue.increment_and_backoff_next_attempt()
except Exception as e:
cleanup = True
if isinstance(e, MorangoResumeSyncError):
Expand Down
Loading