Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify retrying of sync tasks to use resumesync command #9493

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions kolibri/core/auth/management/commands/resumesync.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ class Command(MorangoSyncCommand):

def add_arguments(self, parser):
parser.add_argument(
"--id", type=str, help="ID of an incomplete session to resume sync"
"--sync-session-id",
type=str,
help="ID of an incomplete session to resume sync",
)
parser.add_argument(
"--baseurl", type=str, default=DATA_PORTAL_SYNCING_BASE_URL, dest="baseurl"
Expand Down Expand Up @@ -45,7 +47,7 @@ def add_arguments(self, parser):
def handle_async(self, *args, **options):
(baseurl, sync_session_id, chunk_size,) = (
options["baseurl"],
options["id"],
options["sync_session_id"],
options["chunk_size"],
)

Expand Down
9 changes: 8 additions & 1 deletion kolibri/core/auth/management/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,13 @@ def _sync(self, sync_session_client, **options): # noqa: C901
dataset_cache.clear()
dataset_cache.activate()

# add the sync session ID to the job (task) if it exists for retrying it
if self.job:
self.job.extra_metadata.update(
sync_session_id=sync_session_client.sync_session.id
)
self.job.save_meta()

if not noninteractive:
# output session ID for CLI user
logger.info("Session ID: {}".format(sync_session_client.sync_session.id))
Expand All @@ -435,7 +442,7 @@ def _sync(self, sync_session_client, **options): # noqa: C901
noninteractive,
pull_filter,
)
# and push our own data to server
# and push our own data to server
if not no_push:
self._push(
sync_session_client,
Expand Down
21 changes: 21 additions & 0 deletions kolibri/core/auth/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from django.utils import timezone
from morango.errors import MorangoResumeSyncError
from morango.models import InstanceIDModel
from morango.models.core import SyncSession
from requests.exceptions import ConnectionError
from rest_framework import serializers
from rest_framework import status
Expand Down Expand Up @@ -261,6 +262,26 @@ def validate(self, data):
"args": [data["command"]],
}

def validate_for_restart(self, job):
data = super(SyncJobValidator, self).validate_for_restart(job)

# find the sync_session_id the command added to the job metadata when it ran
sync_session_id = job.extra_metadata.get("sync_session_id")
if sync_session_id:
try:
SyncSession.objects.get(pk=sync_session_id, active=True)
except SyncSession.DoesNotExist:
sync_session_id = None

# if we didn't get an existing active sync_session_id,
# we'll fall back to default functionality
if sync_session_id:
kwargs = data.get("kwargs")
kwargs.update(sync_session_id=sync_session_id)
data.update(args=("resumesync",), kwargs=kwargs)

return data


facility_task_queue = "facility_task"

Expand Down
43 changes: 43 additions & 0 deletions kolibri/core/auth/test/test_auth_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.urls import reverse
from mock import Mock
from mock import patch
from morango.models.core import SyncSession
from requests.exceptions import ConnectionError
from rest_framework import serializers
from rest_framework.exceptions import AuthenticationFailed
Expand All @@ -29,6 +30,7 @@
from kolibri.core.public.constants.user_sync_statuses import QUEUED
from kolibri.core.public.constants.user_sync_statuses import SYNC
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State


DUMMY_PASSWORD = "password"
Expand All @@ -41,6 +43,7 @@
traceback="",
percentage_progress=0,
cancellable=False,
track_progress=True,
extra_metadata={},
func="",
)
Expand Down Expand Up @@ -668,6 +671,46 @@ def test_validate_and_create_sync_credentials_no_credentials(
with self.assertRaises(PermissionDenied):
PeerFacilitySyncJobValidator(data=data).is_valid(raise_exception=True)

def test_validate_for_restart__not_restartable(self):
job = fake_job(state=State.RUNNING)
with self.assertRaises(serializers.ValidationError):
PeerFacilitySyncJobValidator(instance=job).data

def test_validate_for_restart__missing_sync_session(self):
job = fake_job(state=State.FAILED, args=("sync",), kwargs={"test": True})
new_job_data = PeerFacilitySyncJobValidator(instance=job).data
self.assertEqual(new_job_data["args"], ("sync",))
self.assertEqual(new_job_data["kwargs"], {"test": True})

@patch("kolibri.core.auth.tasks.SyncSession.objects.get")
def test_validate_for_restart__inactive_sync_session(self, mock_get):
job = fake_job(
state=State.FAILED,
args=("sync",),
kwargs={"test": True},
extra_metadata={"sync_session_id": "abc123"},
)
mock_get.side_effect = SyncSession.DoesNotExist
new_job_data = PeerFacilitySyncJobValidator(instance=job).data
mock_get.assert_called_once_with(pk="abc123", active=True)
self.assertEqual(new_job_data["args"], ("sync",))
self.assertEqual(new_job_data["kwargs"], {"test": True})

@patch("kolibri.core.auth.tasks.SyncSession.objects.get")
def test_validate_for_restart__resume(self, mock_get):
job = fake_job(
state=State.FAILED,
args=("sync",),
kwargs={"test": True},
extra_metadata={"sync_session_id": "abc123"},
)
new_job_data = PeerFacilitySyncJobValidator(instance=job).data
mock_get.assert_called_once_with(pk="abc123", active=True)
self.assertEqual(new_job_data["args"], ("resumesync",))
self.assertEqual(
new_job_data["kwargs"], {"test": True, "sync_session_id": "abc123"}
)


class TestRequestSoUDSync(TestCase):
def setUp(self):
Expand Down
15 changes: 8 additions & 7 deletions kolibri/core/tasks/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from six import string_types

from kolibri.core.tasks.exceptions import JobNotFound
from kolibri.core.tasks.exceptions import JobNotRestartable
from kolibri.core.tasks.job import State
from kolibri.core.tasks.main import job_storage
from kolibri.core.tasks.registry import TaskRegistry
Expand Down Expand Up @@ -185,13 +184,15 @@ def restart(self, request, pk=None):

job_to_restart = self._get_job_for_pk(request, pk)

try:
restarted_job_id = job_storage.restart_job(job_id=job_to_restart.job_id)
except JobNotRestartable:
raise serializers.ValidationError(
"Cannot restart job with state: {}".format(job_to_restart.state)
)
registered_task = TaskRegistry[job_to_restart.func]
job = registered_task.validate_job_restart(request.user, job_to_restart)

# delete existing task after validation
job_storage.clear(job_id=job_to_restart.job_id, force=False)

restarted_job_id = job_storage.enqueue_job(
job, queue=registered_task.queue, priority=registered_task.priority
)
job_response = self._job_to_response(
job_storage.get_job(job_id=restarted_job_id)
)
Expand Down
4 changes: 0 additions & 4 deletions kolibri/core/tasks/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,3 @@ class UserCancelledError(CancelledError):

class JobNotFound(Exception):
pass


class JobNotRestartable(Exception):
pass
12 changes: 0 additions & 12 deletions kolibri/core/tasks/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,6 @@ def from_json(cls, json_string):

return Job(func, **working_dictionary)

@classmethod
def from_job(cls, job, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer to leave this method here and reference it elsewhere - so that it can continue to be updated in lockstep with updates to the Job constructor.

For example in this PR where I've added a new parameter to the Job object: https://github.com/learningequality/kolibri/pull/9503/files#diff-09ac69ff66caa119977d3ae21fe8dc8bf59f0eaa27b34fbb002e8446f09ca98bR151

Seeing how you're now using it below, maybe we change it from a class method to a regular method and it becomes to_constructor_args or some such?

if not isinstance(job, cls):
raise TypeError("job must be an instance of {}".format(cls))
kwargs["args"] = copy.copy(job.args)
kwargs["kwargs"] = copy.copy(job.kwargs)
kwargs["track_progress"] = job.track_progress
kwargs["cancellable"] = job.cancellable
kwargs["extra_metadata"] = job.extra_metadata.copy()
kwargs["facility_id"] = job.facility_id
return cls(job.func, **kwargs)

def __init__(
self,
func,
Expand Down
18 changes: 18 additions & 0 deletions kolibri/core/tasks/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,24 @@ def validate_job_data(self, user, data):

return job

def validate_job_restart(self, user, job):
"""
:type user: kolibri.core.auth.models.FacilityUser
:type job: kolibri.core.tasks.job.Job
:return: A new job object for restarting
:rtype: kolibri.core.tasks.job.Job
"""
validator = self.validator(instance=job, context={"user": user})

try:
job = self._ready_job(**validator.data)
except TypeError:
raise serializers.ValidationError(
"Invalid job data returned from validator."
)

return job

def enqueue(self, job=None, **job_kwargs):
"""
Enqueue the function with arguments passed to this method.
Expand Down
28 changes: 0 additions & 28 deletions kolibri/core/tasks/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from kolibri.core.tasks.constants import DEFAULT_QUEUE
from kolibri.core.tasks.exceptions import JobNotFound
from kolibri.core.tasks.exceptions import JobNotRestartable
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import Priority
from kolibri.core.tasks.job import State
Expand Down Expand Up @@ -241,33 +240,6 @@ def get_job(self, job_id):
job, _ = self._get_job_and_orm_job(job_id, session)
return job

def restart_job(self, job_id):
"""
First deletes the job with id = job_id then enqueues a new job with the same
job_id as the one we deleted, with same args and kwargs.

Returns the job_id of enqueued job.

Raises `JobNotRestartable` exception if the job with id = job_id state is
not in CANCELED or FAILED.
"""
with self.session_scope() as session:
job_to_restart, orm_job = self._get_job_and_orm_job(job_id, session)
queue = orm_job.queue
priority = orm_job.priority

if job_to_restart.state in [State.CANCELED, State.FAILED]:
self.clear(job_id=job_to_restart.job_id, force=False)
job = Job.from_job(
job_to_restart,
job_id=job_to_restart.job_id,
)
return self.enqueue_job(job, queue=queue, priority=priority)
else:
raise JobNotRestartable(
"Cannot restart job with state={}".format(job_to_restart.state)
)

def check_job_canceled(self, job_id):
job = self.get_job(job_id)
return job.state == State.CANCELED or job.state == State.CANCELING
Expand Down
26 changes: 0 additions & 26 deletions kolibri/core/tasks/test/taskrunner/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
import time

import pytest
from mock import patch

from kolibri.core.tasks.decorators import register_task
from kolibri.core.tasks.exceptions import JobNotRestartable
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import Priority
from kolibri.core.tasks.job import State
Expand Down Expand Up @@ -142,27 +140,3 @@ def test_gets_oldest_high_priority_job_first(self, defaultbackend, simplejob):
defaultbackend.enqueue_job(simplejob, QUEUE, Priority.HIGH)

assert defaultbackend.get_next_queued_job().job_id == job_id

def test_restart_job(self, defaultbackend, simplejob):
with patch("kolibri.core.tasks.main.job_storage", wraps=defaultbackend):
job_id = defaultbackend.enqueue_job(simplejob, QUEUE)

for state in [
State.COMPLETED,
State.RUNNING,
State.QUEUED,
State.SCHEDULED,
State.CANCELING,
]:
defaultbackend._update_job(job_id, state)
with pytest.raises(JobNotRestartable):
defaultbackend.restart_job(job_id)

for state in [State.CANCELED, State.FAILED]:
defaultbackend._update_job(job_id, state)

restarted_job_id = defaultbackend.restart_job(job_id)
restarted_job = defaultbackend.get_job(restarted_job_id)

assert restarted_job_id == job_id
assert restarted_job.state == State.QUEUED
22 changes: 20 additions & 2 deletions kolibri/core/tasks/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,15 +726,33 @@ def test_retrieval_404(self, mock_job_storage):
def test_restart_task(self, mock_job_storage):
self.client.login(username=self.facility2user.username, password=DUMMY_PASSWORD)

mock_job_storage.restart_job.return_value = self.jobs[2].job_id
self.jobs[2].state = State.FAILED
mock_job_storage.get_job.return_value = self.jobs[2]

def _clear(**kwargs):
self.jobs[2].state = State.QUEUED

mock_job_storage.clear.side_effect = _clear

response = self.client.post(
reverse("kolibri:core:task-restart", kwargs={"pk": "2"}), format="json"
)

self.assertEqual(response.data, self.jobs_response[2])
mock_job_storage.restart_job.assert_called_once_with(job_id="2")
mock_job_storage.clear.assert_called_once_with(job_id="2", force=False)

def test_restart_task__not_restartable(self, mock_job_storage):
self.client.login(username=self.facility2user.username, password=DUMMY_PASSWORD)

mock_job_storage.get_job.return_value = self.jobs[2]

response = self.client.post(
reverse("kolibri:core:task-restart", kwargs={"pk": "2"}), format="json"
)

self.assertEqual(response.status_code, 400)
self.assertEqual(str(response.data[0]), "Cannot restart job with state=QUEUED")
mock_job_storage.clear.assert_not_called()

def test_restart_task_respect_permissions(self, mock_job_storage):
self.client.login(username=self.facility2user.username, password=DUMMY_PASSWORD)
Expand Down
24 changes: 23 additions & 1 deletion kolibri/core/tasks/test/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,15 @@ def test_job_save_as_cancellable__no_storage(self):
self.job.save_as_cancellable(cancellable=cancellable)


class TestingJobValidator(JobValidator):
pass


class TestRegisteredTask(TestCase):
def setUp(self):
self.registered_task = RegisteredTask(
int,
validator=TestingJobValidator,
priority=Priority.HIGH,
queue="test",
permission_classes=[IsSuperAdmin],
Expand All @@ -67,7 +72,7 @@ def setUp(self):

def test_constructor_sets_required_params(self):
self.assertEqual(self.registered_task.func, int)
self.assertEqual(self.registered_task.validator, JobValidator)
self.assertEqual(self.registered_task.validator, TestingJobValidator)
self.assertEqual(self.registered_task.priority, Priority.HIGH)
self.assertTrue(isinstance(self.registered_task.permissions[0], IsSuperAdmin))
self.assertEqual(self.registered_task.job_id, "test")
Expand Down Expand Up @@ -153,3 +158,20 @@ def test_enqueue(self, job_storage_mock, _ready_job_mock):
queue=self.registered_task.queue,
priority=self.registered_task.priority,
)

@mock.patch("kolibri.core.tasks.registry.RegisteredTask._ready_job")
def test_validate_job_restart(self, _ready_job_mock):
mock_user = mock.MagicMock(spec="kolibri.core.auth.models.FacilityUser")
mock_job = mock.MagicMock(spec="kolibri.core.tasks.registry.Job")

_ready_job_mock.return_value = "job"

with mock.patch.object(
TestingJobValidator, "validate_for_restart"
) as mock_validate_for_restart:
mock_validate_for_restart.return_value = {"test": True}
result = self.registered_task.validate_job_restart(mock_user, mock_job)
mock_validate_for_restart.assert_called_once_with(mock_job)

self.assertEqual(result, "job")
_ready_job_mock.assert_called_once_with(test=True)
Loading