-
Notifications
You must be signed in to change notification settings - Fork 716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Content import peer discovery #6011
Changes from all commits
a543930
2c3c0cf
2922c0f
e923e50
7d40f1e
ffe47c7
bb2d08a
6648666
a75293d
8ae6bbc
df7523b
2881db3
6dec95c
7c07682
30e13a8
b51c9a0
52eed34
a40c1fc
ae7bb70
9e13f66
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,21 @@ | ||
from rest_framework import viewsets | ||
from rest_framework.response import Response | ||
|
||
from .models import NetworkLocation | ||
from .serializers import NetworkLocationSerializer | ||
from kolibri.core.content.permissions import CanManageContent | ||
from kolibri.core.device.permissions import UserHasAnyDevicePermissions | ||
from kolibri.core.discovery.utils.network.search import get_available_instances | ||
|
||
|
||
class NetworkLocationViewSet(viewsets.ModelViewSet): | ||
permission_classes = (CanManageContent,) | ||
serializer_class = NetworkLocationSerializer | ||
queryset = NetworkLocation.objects.all() | ||
|
||
|
||
class NetworkSearchViewSet(viewsets.ViewSet): | ||
permission_classes = (UserHasAnyDevicePermissions,) | ||
|
||
def list(self, request): | ||
return Response(get_available_instances()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,11 @@ | ||
from rest_framework import routers | ||
|
||
from .api import NetworkLocationViewSet | ||
from .api import NetworkSearchViewSet | ||
|
||
router = routers.SimpleRouter() | ||
|
||
router.register(r"networklocation", NetworkLocationViewSet, base_name="networklocation") | ||
router.register(r"networksearch", NetworkSearchViewSet, base_name="networksearch") | ||
|
||
urlpatterns = router.urls |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
import socket | ||
|
||
import mock | ||
from django.test import TestCase | ||
from zeroconf import BadTypeInNameException | ||
from zeroconf import service_type_name | ||
from zeroconf import ServiceInfo | ||
from zeroconf import Zeroconf | ||
|
||
from ..utils.network.search import _id_from_name | ||
from ..utils.network.search import get_available_instances | ||
from ..utils.network.search import initialize_zeroconf_listener | ||
from ..utils.network.search import KolibriZeroconfService | ||
from ..utils.network.search import LOCAL_DOMAIN | ||
from ..utils.network.search import NonUniqueNameException | ||
from ..utils.network.search import register_zeroconf_service | ||
from ..utils.network.search import SERVICE_TYPE | ||
from ..utils.network.search import unregister_zeroconf_service | ||
from ..utils.network.search import ZEROCONF_STATE | ||
|
||
MOCK_INTERFACE_IP = "111.222.111.222" | ||
MOCK_PORT = 555 | ||
MOCK_ID = "abba" | ||
|
||
|
||
class MockServiceBrowser(object): | ||
def __init__(self, zc, type_, handlers=None, listener=None): | ||
assert handlers or listener, "You need to specify at least one handler" | ||
if not type_.endswith(service_type_name(type_)): | ||
raise BadTypeInNameException | ||
self.zc = zc | ||
self.type = type_ | ||
|
||
def cancel(self): | ||
self.zc.remove_listener(self) | ||
|
||
|
||
class MockZeroconf(Zeroconf): | ||
def __init__(self, *args, **kwargs): | ||
self.browsers = {} | ||
self.services = {} | ||
|
||
def get_service_info(self, type_, name, timeout=3000): | ||
id = _id_from_name(name) | ||
info = ServiceInfo( | ||
SERVICE_TYPE, | ||
name=".".join([id, SERVICE_TYPE]), | ||
server=".".join([id, LOCAL_DOMAIN, ""]), | ||
address=socket.inet_aton(MOCK_INTERFACE_IP), | ||
port=MOCK_PORT, | ||
properties={"facilities": "[]", "channels": "[]"}, | ||
) | ||
return info | ||
|
||
def add_service_listener(self, type_, listener): | ||
self.remove_service_listener(listener) | ||
self.browsers[listener] = MockServiceBrowser(self, type_, listener) | ||
for info in self.services.values(): | ||
listener.add_service(self, info.type, info.name) | ||
|
||
def register_service(self, info, ttl=60, allow_name_change=False): | ||
self.check_service(info, allow_name_change) | ||
self.services[info.name.lower()] = info | ||
for listener in self.browsers: | ||
listener.add_service(self, info.type, info.name) | ||
|
||
def unregister_service(self, info): | ||
for listener in self.browsers: | ||
listener.remove_service(self, info.type, info.name) | ||
|
||
def check_service(self, info, allow_name_change): | ||
service_name = service_type_name(info.name) | ||
if not info.type.endswith(service_name): | ||
raise BadTypeInNameException | ||
|
||
instance_name = info.name[: -len(service_name) - 1] | ||
next_instance_number = 2 | ||
|
||
# check for a name conflict | ||
while info.name.lower() in self.services: | ||
|
||
if not allow_name_change: | ||
raise NonUniqueNameException | ||
|
||
# change the name and look for a conflict | ||
info.name = "%s-%s.%s" % (instance_name, next_instance_number, info.type) | ||
next_instance_number += 1 | ||
service_type_name(info.name) | ||
|
||
|
||
@mock.patch( | ||
"kolibri.core.discovery.utils.network.search._is_port_open", lambda *a, **kw: True | ||
) | ||
@mock.patch("kolibri.core.discovery.utils.network.search.Zeroconf", MockZeroconf) | ||
@mock.patch( | ||
"kolibri.core.discovery.utils.network.search.get_all_addresses", | ||
lambda: [MOCK_INTERFACE_IP], | ||
) | ||
class TestNetworkSearch(TestCase): | ||
def test_initialize_zeroconf_listener(self): | ||
assert ZEROCONF_STATE["listener"] is None | ||
initialize_zeroconf_listener() | ||
assert ZEROCONF_STATE["listener"] is not None | ||
|
||
def test_register_zeroconf_service(self): | ||
assert len(get_available_instances()) == 0 | ||
initialize_zeroconf_listener() | ||
register_zeroconf_service(MOCK_PORT, MOCK_ID) | ||
assert get_available_instances() == [ | ||
{ | ||
"id": MOCK_ID, | ||
"ip": MOCK_INTERFACE_IP, | ||
"local": True, | ||
"self": True, | ||
"port": MOCK_PORT, | ||
"host": ".".join([MOCK_ID, LOCAL_DOMAIN]), | ||
"data": {"facilities": [], "channels": []}, | ||
"base_url": "http://{ip}:{port}/".format( | ||
ip=MOCK_INTERFACE_IP, port=MOCK_PORT | ||
), | ||
} | ||
] | ||
register_zeroconf_service(MOCK_PORT, MOCK_ID) | ||
unregister_zeroconf_service() | ||
assert len(get_available_instances()) == 0 | ||
|
||
def test_naming_conflict(self): | ||
assert not ZEROCONF_STATE["listener"] | ||
service1 = KolibriZeroconfService(id=MOCK_ID, port=MOCK_PORT) | ||
service1.register() | ||
assert len(get_available_instances()) == 1 | ||
service2 = KolibriZeroconfService(id=MOCK_ID, port=MOCK_PORT) | ||
service2.register() | ||
assert len(get_available_instances()) == 2 | ||
assert service1.id + "-2" == service2.id | ||
service1.unregister() | ||
service2.unregister() | ||
|
||
def test_irreconcilable_naming_conflict(self): | ||
services = [KolibriZeroconfService(id=MOCK_ID, port=MOCK_PORT).register()] | ||
for i in range(110): | ||
services.append( | ||
KolibriZeroconfService( | ||
id="-".join([MOCK_ID, str(i)]), port=MOCK_PORT | ||
).register() | ||
) | ||
with self.assertRaises(NonUniqueNameException): | ||
KolibriZeroconfService(id=MOCK_ID, port=MOCK_PORT).register() | ||
for service in services: | ||
service.unregister() | ||
|
||
def test_excluding_local(self): | ||
initialize_zeroconf_listener() | ||
register_zeroconf_service(MOCK_PORT, MOCK_ID) | ||
assert len(get_available_instances()) == 1 | ||
assert len(get_available_instances(include_local=False)) == 0 | ||
unregister_zeroconf_service() | ||
|
||
def tearDown(self): | ||
unregister_zeroconf_service() | ||
ZEROCONF_STATE["zeroconf"] = None | ||
ZEROCONF_STATE["listener"] = None | ||
ZEROCONF_STATE["service"] = None | ||
super(TestNetworkSearch, self).tearDown() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
import atexit | ||
import json | ||
import logging | ||
import socket | ||
import time | ||
from contextlib import closing | ||
|
||
from zeroconf import get_all_addresses | ||
from zeroconf import NonUniqueNameException | ||
from zeroconf import ServiceInfo | ||
from zeroconf import USE_IP_OF_OUTGOING_INTERFACE | ||
from zeroconf import Zeroconf | ||
|
||
from kolibri.core.auth.models import Facility | ||
from kolibri.core.content.models import ChannelMetadata | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
SERVICE_TYPE = "Kolibri._sub._http._tcp.local." | ||
LOCAL_DOMAIN = "kolibri.local" | ||
|
||
ZEROCONF_STATE = {"zeroconf": None, "listener": None, "service": None} | ||
|
||
|
||
def _id_from_name(name): | ||
assert name.endswith(SERVICE_TYPE), ( | ||
"Invalid service name; must end with '%s'" % SERVICE_TYPE | ||
) | ||
return name.replace(SERVICE_TYPE, "").strip(".") | ||
|
||
|
||
def _is_port_open(host, port, timeout=1): | ||
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: | ||
sock.settimeout(timeout) | ||
return sock.connect_ex((host, port)) == 0 | ||
|
||
|
||
class KolibriZeroconfService(object): | ||
|
||
info = None | ||
|
||
def __init__(self, id, port=8080, data={}): | ||
self.id = id | ||
self.port = port | ||
self.data = {key: json.dumps(val) for (key, val) in data.items()} | ||
atexit.register(self.cleanup) | ||
|
||
def register(self): | ||
|
||
if not ZEROCONF_STATE["zeroconf"]: | ||
initialize_zeroconf_listener() | ||
|
||
assert self.info is None, "Service is already registered!" | ||
|
||
i = 1 | ||
id = self.id | ||
|
||
while not self.info: | ||
|
||
# attempt to create an mDNS service and register it on the network | ||
try: | ||
info = ServiceInfo( | ||
SERVICE_TYPE, | ||
name=".".join([id, SERVICE_TYPE]), | ||
server=".".join([id, LOCAL_DOMAIN, ""]), | ||
address=USE_IP_OF_OUTGOING_INTERFACE, | ||
port=self.port, | ||
properties=self.data, | ||
) | ||
|
||
ZEROCONF_STATE["zeroconf"].register_service(info, ttl=60) | ||
|
||
self.info = info | ||
|
||
except NonUniqueNameException: | ||
# if there's a name conflict, append incrementing integer until no conflict | ||
i += 1 | ||
id = "%s-%d" % (self.id, i) | ||
|
||
if i > 100: | ||
raise NonUniqueNameException() | ||
|
||
self.id = id | ||
|
||
return self | ||
|
||
def unregister(self): | ||
|
||
assert self.info is not None, "Service is not registered!" | ||
|
||
ZEROCONF_STATE["zeroconf"].unregister_service(self.info) | ||
|
||
self.info = None | ||
|
||
def cleanup(self, *args, **kwargs): | ||
|
||
if self.info and ZEROCONF_STATE["zeroconf"]: | ||
self.unregister() | ||
|
||
|
||
class KolibriZeroconfListener(object): | ||
|
||
instances = {} | ||
|
||
def add_service(self, zeroconf, type, name): | ||
info = zeroconf.get_service_info(type, name) | ||
id = _id_from_name(name) | ||
ip = socket.inet_ntoa(info.address) | ||
self.instances[id] = { | ||
"id": id, | ||
"ip": ip, | ||
"local": ip in get_all_addresses(), | ||
"port": info.port, | ||
"host": info.server.strip("."), | ||
"data": {key: json.loads(val) for (key, val) in info.properties.items()}, | ||
"base_url": "http://{ip}:{port}/".format(ip=ip, port=info.port), | ||
} | ||
logger.info( | ||
"Kolibri instance '%s' joined zeroconf network; service info: %s\n" | ||
% (id, self.instances[id]) | ||
) | ||
|
||
def remove_service(self, zeroconf, type, name): | ||
id = _id_from_name(name) | ||
logger.info("\nKolibri instance '%s' has left the zeroconf network.\n" % (id,)) | ||
if id in self.instances: | ||
del self.instances[id] | ||
|
||
|
||
def get_available_instances(timeout=2, include_local=True): | ||
"""Retrieve a list of dicts with information about the discovered Kolibri instances on the local network, | ||
filtering out those that can't be accessed at the specified port (via attempting to open a socket).""" | ||
if not ZEROCONF_STATE["listener"]: | ||
initialize_zeroconf_listener() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is going to be taken out so that we don't end up with multiple zeroconf listeners. Instead, the single zeroconf listener will pickle the latest list of instances, and various workers will access the pickled list. |
||
time.sleep(3) | ||
instances = [] | ||
for instance in ZEROCONF_STATE["listener"].instances.values(): | ||
if instance["local"] and not include_local: | ||
continue | ||
if not _is_port_open(instance["ip"], instance["port"], timeout=timeout): | ||
continue | ||
instance["self"] = ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will get moved into the zeroconf discovery handler. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (and can just be excluded off the bat) |
||
ZEROCONF_STATE["service"] and ZEROCONF_STATE["service"].id == instance["id"] | ||
) | ||
instances.append(instance) | ||
return instances | ||
|
||
|
||
def register_zeroconf_service(port, id): | ||
if ZEROCONF_STATE["service"] is not None: | ||
unregister_zeroconf_service() | ||
logger.info("Registering ourselves to zeroconf network with id '%s'..." % id) | ||
data = { | ||
"facilities": list(Facility.objects.values("id", "dataset_id", "name")), | ||
"channels": list( | ||
ChannelMetadata.objects.filter(root__available=True).values("id", "name") | ||
), | ||
} | ||
ZEROCONF_STATE["service"] = KolibriZeroconfService(id=id, port=port, data=data) | ||
ZEROCONF_STATE["service"].register() | ||
|
||
|
||
def unregister_zeroconf_service(): | ||
logger.info("Unregistering ourselves from zeroconf network...") | ||
if ZEROCONF_STATE["service"] is not None: | ||
ZEROCONF_STATE["service"].cleanup() | ||
ZEROCONF_STATE["service"] = None | ||
|
||
|
||
def initialize_zeroconf_listener(): | ||
ZEROCONF_STATE["zeroconf"] = Zeroconf() | ||
ZEROCONF_STATE["listener"] = KolibriZeroconfListener() | ||
ZEROCONF_STATE["zeroconf"].add_service_listener( | ||
SERVICE_TYPE, ZEROCONF_STATE["listener"] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a little local cache so that this check isn't done too frequently for a given peer, but so that the check is done:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest having it be a "shared" cache (not just local memory), using Django's cache backend, because it might be different web service processes handling the sequential queries.