Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix soud intervals #8262

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions kolibri/core/auth/management/commands/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from kolibri.core.auth.models import dataset_cache
from kolibri.core.auth.sync_event_hook_utils import register_sync_event_handlers
from kolibri.core.logger.utils.data import bytes_for_humans
from kolibri.core.public.utils import schedule_new_sync
from kolibri.core.tasks.exceptions import UserCancelledError
from kolibri.core.tasks.management.commands.base import AsyncCommand
from kolibri.core.utils.lock import db_lock
Expand Down Expand Up @@ -81,6 +82,12 @@ def add_arguments(self, parser):
action="store_true",
help="do not create a facility and temporary superuser",
)
parser.add_argument(
"--resync-interval",
type=int,
default=None,
help="Seconds to schedule a new sync",
)
# parser.add_argument("--scope-id", type=str, default=FULL_FACILITY)

def handle_async(self, *args, **options): # noqa C901
Expand All @@ -96,6 +103,7 @@ def handle_async(self, *args, **options): # noqa C901
no_pull,
noninteractive,
no_provision,
resync_interval,
) = (
options["baseurl"],
options["facility"],
Expand All @@ -107,6 +115,7 @@ def handle_async(self, *args, **options): # noqa C901
options["no_pull"],
options["noninteractive"],
options["no_provision"],
options["resync_interval"],
)

PORTAL_SYNC = baseurl == DATA_PORTAL_SYNCING_BASE_URL
Expand Down Expand Up @@ -269,6 +278,8 @@ def handle_async(self, *args, **options): # noqa C901
self.job.save_meta()

dataset_cache.deactivate()
if user_id and resync_interval:
schedule_new_sync(baseurl, user_id, resync_interval)
logger.info("Syncing has been completed.")

@contextmanager
Expand Down
15 changes: 6 additions & 9 deletions kolibri/core/device/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
from kolibri.core.content.permissions import CanManageContent
from kolibri.core.device.utils import get_device_setting
from kolibri.core.discovery.models import DynamicNetworkLocation
from kolibri.core.public.constants.user_sync_options import DELAYED_SYNC
from kolibri.core.public.constants.user_sync_statuses import NOT_RECENTLY_SYNCED
from kolibri.core.public.constants.user_sync_statuses import QUEUED
from kolibri.core.public.constants.user_sync_statuses import RECENTLY_SYNCED
from kolibri.core.public.constants.user_sync_statuses import SYNCING
from kolibri.utils.conf import OPTIONS
from kolibri.utils.server import get_urls
from kolibri.utils.server import installation_type
Expand Down Expand Up @@ -197,12 +202,6 @@ class Meta:
fields = ["user", "member_of"]


RECENTLY_SYNCED = "RECENTLY_SYNCED"
SYNCING = "SYNCING"
QUEUED = "QUEUED"
NOT_RECENTLY_SYNCED = "NOT_RECENTLY_SYNCED"


def map_status(status):
"""
Summarize the current state of the sync into a constant for use by
Expand All @@ -213,9 +212,7 @@ def map_status(status):
elif status["queued"]:
return QUEUED
elif status["last_synced"]:
# Keep this as a fixed constant for now.
# In future versions this may be configurable.
if timezone.now() - status["last_synced"] < timedelta(minutes=15):
if timezone.now() - status["last_synced"] < timedelta(seconds=DELAYED_SYNC):
return RECENTLY_SYNCED
else:
return NOT_RECENTLY_SYNCED
Expand Down
12 changes: 7 additions & 5 deletions kolibri/core/public/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from django.http import HttpResponse
from django.http import HttpResponseBadRequest
from django.http import HttpResponseNotFound
from django.utils import timezone
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.gzip import gzip_page
from morango.models.core import TransferSession
Expand All @@ -31,9 +32,9 @@
from kolibri.core.device.models import SyncQueue
from kolibri.core.device.models import UserSyncStatus
from kolibri.core.device.utils import allow_peer_unlisted_channel_import

MAX_CONCURRENT_SYNCS = 1
HANDSHAKING_TIME = 5
from kolibri.core.public.constants.user_sync_options import HANDSHAKING_TIME
from kolibri.core.public.constants.user_sync_options import MAX_CONCURRENT_SYNCS
from kolibri.utils.conf import OPTIONS


class InfoViewSet(viewsets.ViewSet):
Expand Down Expand Up @@ -188,13 +189,14 @@ def list(self, request):
return Response(queue)

def check_queue(self, pk=None):
last_activity = datetime.datetime.now() - datetime.timedelta(minutes=5)
sync_interval = OPTIONS["Deployment"]["SYNC_INTERVAL"]
last_activity = timezone.now() - datetime.timedelta(minutes=5)
current_transfers = TransferSession.objects.filter(
active=True, last_activity_timestamp__gte=last_activity
).count()
if current_transfers < MAX_CONCURRENT_SYNCS:
allow_sync = True
data = {"action": SYNC}
data = {"action": SYNC, "sync_interval": sync_interval}
else:
# polling time at least HANDSHAKING_TIME seconds per position in the queue to
# be greater than the time needed for the handshake part of the ssl protocol
Expand Down
10 changes: 10 additions & 0 deletions kolibri/core/public/constants/user_sync_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
This module contains constants representing options for SoUD sync
"""
from __future__ import unicode_literals

SYNC_INTERVAL = 5 # client: recommended seconds between sync intervals
DELAYED_SYNC = 900 # client: seconds to mark sync as not recent

MAX_CONCURRENT_SYNCS = 1 # Server: max number of concurrent syncs allowed
HANDSHAKING_TIME = 5 # Server: minimum time (seconds) considered as the ttl for the next sync request from an enqueued client
7 changes: 5 additions & 2 deletions kolibri/core/public/constants/user_sync_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@
from __future__ import unicode_literals


SYNC = "sync" # can begin a sync right now
QUEUED = "queued" # request added to the queue
SYNC = "SYNC" # can begin a sync right now
RECENTLY_SYNCED = "RECENTLY_SYNCED"
SYNCING = "SYNCING"
QUEUED = "QUEUED"
NOT_RECENTLY_SYNCED = "NOT_RECENTLY_SYNCED"
106 changes: 79 additions & 27 deletions kolibri/core/public/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import requests
from django.core.management import call_command
from django.core.urlresolvers import reverse
from django.utils import timezone
from morango.models import InstanceIDModel
from rest_framework import status

Expand All @@ -20,8 +21,10 @@
from kolibri.core.tasks.api import prepare_soud_sync_job
from kolibri.core.tasks.api import prepare_sync_task
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State
from kolibri.core.tasks.main import queue
from kolibri.core.tasks.main import scheduler
from kolibri.utils.conf import OPTIONS


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,7 +89,9 @@ def get_device_info(version=DEVICE_INFO_VERSION):
return info


def startpeerusersync(server, user_id):
def startpeerusersync(
server, user_id, resync_interval=OPTIONS["Deployment"]["SYNC_INTERVAL"]
):
"""
Initiate a SYNC (PULL + PUSH) of a specific user from another device.
"""
Expand All @@ -110,7 +115,7 @@ def startpeerusersync(server, user_id):
job_data = prepare_soud_sync_job(
server, facility_id, user_id, extra_metadata=extra_metadata
)

job_data["resync_interval"] = resync_interval
job_id = queue.enqueue(call_command, "sync", **job_data)

return job_id
Expand All @@ -126,11 +131,46 @@ def begin_request_soud_sync(server, user):
# this does not make sense unless this is a SoUD
logger.warn("Only Subsets of Users Devices can do this")
return
logger.info("Queuing SoUD syncing request")
users = UserSyncStatus.objects.filter(user_id=user).values(
"queued", "sync_session__last_activity_timestamp"
)
if users:
SYNC_INTERVAL = OPTIONS["Deployment"]["SYNC_INTERVAL"]
dt = datetime.timedelta(seconds=SYNC_INTERVAL)
if timezone.now() - users[0]["sync_session__last_activity_timestamp"] < dt:
schedule_new_sync(server, user)
return

if users[0]["queued"]:
failed_jobs = [
j
for j in queue.jobs
if j.state == State.FAILED
and j.extra_metadata.get("started_by", None) == user
and j.extra_metadata.get("type", None) == "SYNCPEER/SINGLE"
]
queued_jobs = [
j
for j in queue.jobs
if j.state == State.QUEUED
and j.extra_metadata.get("started_by", None) == user
and j.extra_metadata.get("type", None) == "SYNCPEER/SINGLE"
]
if failed_jobs:
for j in failed_jobs:
queue.clear_job(j.job_id)
# if previous sync jobs have failed, unblock UserSyncStatus to try again:
UserSyncStatus.objects.update_or_create(
user_id=user, defaults={"queued": False}
)
elif queued_jobs:
return # If there are pending and not failed jobs, don't enqueue a new one

logger.info("Queuing SoUD syncing request for user {}".format(user))
queue.enqueue(request_soud_sync, server, user)


def request_soud_sync(server, user=None, queue_id=None, ttl=10):
def request_soud_sync(server, user, queue_id=None, ttl=10):
"""
Make a request to the serverurl endpoint to sync this SoUD (Subset of Users Device)
- If the server says "sync now" immediately queue a sync task for the server
Expand Down Expand Up @@ -171,35 +211,47 @@ def request_soud_sync(server, user=None, queue_id=None, ttl=10):
)
return

if response.status_code == status.HTTP_404_NOT_FOUND:
return # Request done to a server not owning this user's data

if response.status_code == status.HTTP_200_OK:
handle_server_sync_response(response, server, user)


def handle_server_sync_response(response, server, user):
# In either case, we set the sync status for this user as queued
# Once the sync starts, then this should get cleared and the SyncSession
# set on the status, so that more info can be garnered.
response_content = (
response.content.decode()
if isinstance(response.content, bytes)
else response.content
)
server_response = json.loads(response_content or "{}")
if response.status_code == status.HTTP_404_NOT_FOUND:
return # Request done to a server not owning this user's data
UserSyncStatus.objects.update_or_create(user_id=user, defaults={"queued": True})

if response.status_code == status.HTTP_200_OK:
# In either case, we set the sync status for this user as queued
# Once the sync starts, then this should get cleared and the SyncSession
# set on the status, so that more info can be garnered.
UserSyncStatus.objects.update_or_create(user_id=user, defaults={"queued": True})

if server_response["action"] == SYNC:
job_id = startpeerusersync(server, user)
logger.info(
"Enqueuing a sync task for user {} in job {}".format(user, job_id)
if server_response["action"] == SYNC:
server_sync_interval = server_response.get(
"sync_interval", str(OPTIONS["Deployment"]["SYNC_INTERVAL"])
)
job_id = startpeerusersync(server, user, server_sync_interval)
logger.info("Enqueuing a sync task for user {} in job {}".format(user, job_id))

elif server_response["action"] == QUEUED:
pk = server_response["id"]
time_alive = server_response["keep_alive"]
dt = datetime.timedelta(seconds=int(time_alive))
job = Job(request_soud_sync, server, user, pk)
scheduler.enqueue_in(dt, job)
logger.info(
"Server busy, will try again in {} seconds with pk={}".format(
time_alive, pk
)
)

elif server_response["action"] == QUEUED:
pk = server_response["id"]
time_alive = server_response["keep_alive"]
dt = datetime.timedelta(seconds=int(time_alive))
job = Job(request_soud_sync, server, user, pk)
scheduler.enqueue_in(dt, job)
logger.info(
"Server busy, will try again in {} seconds with pk={}".format(
time_alive, pk
)
)

def schedule_new_sync(server, user, interval=OPTIONS["Deployment"]["SYNC_INTERVAL"]):
# reschedule the process for a new sync
dt = datetime.timedelta(seconds=interval)
job = Job(request_soud_sync, server, user)
rtibbles marked this conversation as resolved.
Show resolved Hide resolved
scheduler.enqueue_in(dt, job)
7 changes: 7 additions & 0 deletions kolibri/utils/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,13 @@ def url_prefix(value):
Server configuration should handle ensuring that the files are properly served.
""",
},
"SYNC_INTERVAL": {
"type": "integer",
"default": 5,
"description": """
In case a SoUD connects to this server, the SoUD should use this interval to resync every user.
""",
},
},
"Python": {
"PICKLE_PROTOCOL": {
Expand Down