From 73bc252289ffa22a60b09cdba49f5b8a5add1580 Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Tue, 17 Aug 2021 08:56:55 -0700 Subject: [PATCH] Fix cert filter handling, alter sync resume logic --- kolibri/core/auth/management/utils.py | 6 +- .../core/discovery/utils/network/search.py | 20 +++-- kolibri/core/public/utils.py | 88 +++++++++++++++---- 3 files changed, 88 insertions(+), 26 deletions(-) diff --git a/kolibri/core/auth/management/utils.py b/kolibri/core/auth/management/utils.py index bc43b5c595..4ced8538b9 100644 --- a/kolibri/core/auth/management/utils.py +++ b/kolibri/core/auth/management/utils.py @@ -483,7 +483,7 @@ def _sync(self, sync_session_client, **options): # noqa: C901 client_cert = sync_session_client.sync_session.client_certificate register_sync_event_handlers(sync_session_client.controller) - sync_filter = get_sync_filter(client_cert) + client_cert_scope = client_cert.get_scope() scope_params = json.loads(client_cert.scope_params) dataset_id = scope_params.get("dataset_id") @@ -503,14 +503,14 @@ def _sync(self, sync_session_client, **options): # noqa: C901 self._pull( sync_session_client, noninteractive, - sync_filter, + client_cert_scope.read_filter, ) # and push our own data to server if not no_push: self._push( sync_session_client, noninteractive, - sync_filter, + client_cert_scope.write_filter, ) if not no_provision: diff --git a/kolibri/core/discovery/utils/network/search.py b/kolibri/core/discovery/utils/network/search.py index 6c8ede75dd..8ba839c203 100644 --- a/kolibri/core/discovery/utils/network/search.py +++ b/kolibri/core/discovery/utils/network/search.py @@ -18,6 +18,7 @@ from kolibri.core.device.utils import get_device_setting from kolibri.core.discovery.models import DynamicNetworkLocation from kolibri.core.public.utils import begin_request_soud_sync +from kolibri.core.public.utils import cleanup_server_soud_sync from kolibri.core.public.utils import get_device_info from kolibri.core.public.utils import stop_request_soud_sync @@ -239,13 +240,18 @@ def remove_service(self, zeroconf, type, name): if id in self.instances: if not get_is_self(id): instance = self.instances[id] - if get_device_setting( - "subset_of_users_device", False - ) and not instance.get("subset_of_users_device", False): - for user in FacilityUser.objects.all().values("id"): - stop_request_soud_sync( - server=instance.get("base_url"), user=user["id"] - ) + is_soud = instance.get("subset_of_users_device", False) + + if get_device_setting("subset_of_users_device", False): + if not is_soud: + for user in FacilityUser.objects.all().values("id"): + stop_request_soud_sync( + server=instance.get("base_url"), user=user["id"] + ) + elif is_soud: + # this means our device is not SoUD, and instance is a SoUD + cleanup_server_soud_sync(instance) + del self.instances[id] except KeyError: pass diff --git a/kolibri/core/public/utils.py b/kolibri/core/public/utils.py index 8b8e1c2ed9..07d332e8c9 100644 --- a/kolibri/core/public/utils.py +++ b/kolibri/core/public/utils.py @@ -1,5 +1,6 @@ import datetime import hashlib +import json import logging import platform import random @@ -9,9 +10,13 @@ from django.core.urlresolvers import reverse from django.utils import timezone from morango.models import InstanceIDModel +from morango.models import SyncSession +from morango.models import TransferSession from rest_framework import status import kolibri +from kolibri.core.auth.constants.morango_sync import PROFILE_FACILITY_DATA +from kolibri.core.auth.constants.morango_sync import ScopeDefinitions from kolibri.core.auth.models import FacilityUser from kolibri.core.device.models import UserSyncStatus from kolibri.core.device.utils import DeviceNotProvisioned @@ -90,6 +95,45 @@ def get_device_info(version=DEVICE_INFO_VERSION): return info +def find_soud_sync_sessions(**filters): + """ + :param filters: A dict of queryset filter + :return: A SyncSession queryset + """ + return SyncSession.objects.filter( + active=True, + connection_kind="network", + profile=PROFILE_FACILITY_DATA, + client_certificate__scope_definition_id=ScopeDefinitions.SINGLE_USER, + **filters + ).order("-last_activity_timestamp") + + +def find_soud_sync_session_for_resume(user, base_url): + """ + Finds the most recently active sync session for a SoUD sync + + :type user: FacilityUser + :type base_url: str + :rtype: SyncSession|None + """ + # SoUD requests sync with server, so for resume we filter by client and matching base url + sync_sessions = find_soud_sync_sessions( + is_server=False, + connection_path__startswith=base_url, + ) + + # ensure the certificate is for the user we're checking for + for sync_session in sync_sessions: + scope_params = json.loads(sync_session.client_certificate.scope_params) + dataset_id = scope_params.get("dataset_id") + user_id = scope_params.get("user_id", None) + if user_id == user.id and user.dataset_id == dataset_id: + return sync_session + + return None + + def peer_sync(**kwargs): try: call_command("sync", **kwargs) @@ -135,16 +179,13 @@ def startpeerusersync( ) job_data = None # attempt to resume an existing session - try: - sync_session = UserSyncStatus.objects.get(user=user).sync_session - if sync_session and sync_session.active: - command = "resumesync" - # if resuming encounters an error, it should close the session to avoid a loop - job_data = prepare_soud_resume_sync_job( - server, sync_session.id, close_on_error=True, **common_job_args - ) - except UserSyncStatus.DoesNotExist: - pass + sync_session = find_soud_sync_session_for_resume(user, server) + if sync_session is not None: + command = "resumesync" + # if resuming encounters an error, it should close the session to avoid a loop + job_data = prepare_soud_resume_sync_job( + server, sync_session.id, close_on_error=True, **common_job_args + ) # if not resuming, prepare normal job if job_data is None: @@ -160,12 +201,11 @@ def stoppeerusersync(server, user_id): """ Close the sync session with a server """ - # skip if no sync status, no sync session, or sync session is inactive - try: - sync_session = UserSyncStatus.objects.get(user=user_id).sync_session - if not sync_session or not sync_session.active: - return - except UserSyncStatus.DoesNotExist: + user = FacilityUser.objects.get(pk=user_id) + sync_session = find_soud_sync_session_for_resume(user, server) + + # skip if we couldn't find one for resume + if sync_session is None: return # hack: queue the resume job, without push or pull, and without keep_alive, so it should close @@ -359,3 +399,19 @@ def schedule_new_sync(server, user, interval=OPTIONS["Deployment"]["SYNC_INTERVA JOB_ID = hashlib.md5("{}:{}".format(server, user).encode()).hexdigest() job = Job(request_soud_sync, server, user, job_id=JOB_ID) scheduler.enqueue_in(dt, job) + + +def cleanup_server_soud_sync(device_info): + """ + :param device_info: Client's device info + """ + sync_sessions = find_soud_sync_sessions(is_server=True) + for sync_session in sync_sessions: + if ( + TransferSession.objects.filter( + sync_session=sync_session, active=True + ).count() + == 0 + ): + sync_session.active = False + sync_session.save()