Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: integrate into django-channels #2405

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
9 changes: 8 additions & 1 deletion umap/asgi.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import os

from channels.routing import ProtocolTypeRouter
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application
from django.urls import re_path

from .sync import consumers

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "umap.settings")
# Initialize Django ASGI application early to ensure the AppRegistry
# is populated before importing code that may import ORM models.
django_asgi_app = get_asgi_application()

urlpatterns = (re_path(r"ws/sync/(?P<map_id>\w+)/$", consumers.SyncConsumer.as_asgi()),)

application = ProtocolTypeRouter(
{
"http": django_asgi_app,
"websocket": AllowedHostsOriginValidator(URLRouter(urlpatterns)),
}
)
23 changes: 0 additions & 23 deletions umap/management/commands/run_websocket_server.py

This file was deleted.

1 change: 1 addition & 0 deletions umap/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,4 @@
WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost")
WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001)
WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001")
CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}}
7 changes: 4 additions & 3 deletions umap/static/umap/js/modules/sync/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class SyncEngine {

start(authToken) {
this.transport = new WebSocketTransport(
this._umap.properties.websocketURI,
Utils.template(this._umap.properties.websocketURI, { id: this._umap.id }),
authToken,
this
)
Expand Down Expand Up @@ -124,7 +124,7 @@ export class SyncEngine {

if (this.offline) return
if (this.transport) {
this.transport.send('OperationMessage', message)
this.transport.send('OperationMessage', { sender: this.uuid, ...message })
}
}

Expand Down Expand Up @@ -176,6 +176,7 @@ export class SyncEngine {
* @param {Object} payload
*/
onOperationMessage(payload) {
if (payload.sender === this.uuid) return
this._operations.storeRemoteOperations([payload])
this._applyOperation(payload)
}
Expand Down Expand Up @@ -483,7 +484,7 @@ export class Operations {
return (
Utils.deepEqual(local.subject, remote.subject) &&
Utils.deepEqual(local.metadata, remote.metadata) &&
(!shouldCheckKey || (shouldCheckKey && local.key == remote.key))
(!shouldCheckKey || (shouldCheckKey && local.key === remote.key))
)
}
}
Expand Down
3 changes: 2 additions & 1 deletion umap/static/umap/js/modules/sync/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export class WebSocketTransport {
this.receiver = messagesReceiver
this.closeRequested = false

this.websocket = new WebSocket(webSocketURI)
this.websocket = new WebSocket(`${webSocketURI}`)

this.websocket.onopen = () => {
this.send('JoinRequest', { token: authToken })
Expand Down Expand Up @@ -49,6 +49,7 @@ export class WebSocketTransport {
}

onMessage(wsMessage) {
console.log(wsMessage)
if (wsMessage.data === 'pong') {
this.pongReceived = true
} else {
Expand Down
Empty file added umap/sync/__init__.py
Empty file.
86 changes: 86 additions & 0 deletions umap/sync/consumers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import logging

from channels.generic.websocket import AsyncWebsocketConsumer
from django.core.signing import TimestampSigner
from pydantic import ValidationError

from .payloads import (
JoinRequest,
JoinResponse,
ListPeersResponse,
OperationMessage,
PeerMessage,
Request,
)


class SyncConsumer(AsyncWebsocketConsumer):
@property
def peers(self):
return self.channel_layer.groups[self.map_id].keys()

async def connect(self):
self.map_id = self.scope["url_route"]["kwargs"]["map_id"]

# Join room group
await self.channel_layer.group_add(self.map_id, self.channel_name)

self.is_authenticated = False
await self.accept()

async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.map_id, self.channel_name)
await self.send_peers_list()

async def send_peers_list(self):
message = ListPeersResponse(peers=self.peers)
await self.broadcast(message.model_dump_json())

async def broadcast(self, message):
# Send to all channels (including sender!)
await self.channel_layer.group_send(
self.map_id, {"message": message, "type": "on_message"}
)

async def send_to(self, channel, message):
# Send to one given channel
await self.channel_layer.send(
channel, {"message": message, "type": "on_message"}
)

async def on_message(self, event):
# Send to self channel
await self.send(event["message"])

async def receive(self, text_data):
if not self.is_authenticated:
message = JoinRequest.model_validate_json(text_data)
signed = TimestampSigner().unsign_object(message.token, max_age=30)
user, map_id, permissions = signed.values()
if "edit" not in permissions:
return await self.disconnect()
response = JoinResponse(uuid=self.channel_name, peers=self.peers)
await self.send(response.model_dump_json())
await self.send_peers_list()
self.is_authenticated = True
return

if text_data == "ping":
return await self.send("pong")

try:
incoming = Request.model_validate_json(text_data)
except ValidationError as error:
message = (
f"An error occurred when receiving the following message: {text_data!r}"
)
logging.error(message, error)
else:
match incoming.root:
# Broadcast all operation messages to connected peers
case OperationMessage():
await self.broadcast(text_data)

# Send peer messages to the proper peer
case PeerMessage():
await self.send_to(incoming.root.recipient, text_data)
47 changes: 47 additions & 0 deletions umap/sync/payloads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Literal, Optional, Union

from pydantic import BaseModel, Field, RootModel


class JoinRequest(BaseModel):
kind: Literal["JoinRequest"] = "JoinRequest"
token: str


class OperationMessage(BaseModel):
"""Message sent from one peer to all the others"""

kind: Literal["OperationMessage"] = "OperationMessage"
verb: Literal["upsert", "update", "delete"]
subject: Literal["map", "datalayer", "feature"]
metadata: Optional[dict] = None
key: Optional[str] = None


class PeerMessage(BaseModel):
"""Message sent from a specific peer to another one"""

kind: Literal["PeerMessage"] = "PeerMessage"
sender: str
recipient: str
# The message can be whatever the peers want. It's not checked by the server.
message: dict


class Request(RootModel):
"""Any message coming from the websocket should be one of these, and will be rejected otherwise."""

root: Union[PeerMessage, OperationMessage] = Field(discriminator="kind")


class JoinResponse(BaseModel):
"""Server response containing the list of peers"""

kind: Literal["JoinResponse"] = "JoinResponse"
peers: list
uuid: str


class ListPeersResponse(BaseModel):
kind: Literal["ListPeersResponse"] = "ListPeersResponse"
peers: list
13 changes: 13 additions & 0 deletions umap/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path

import pytest
from channels.testing import ChannelsLiveServerTestCase
from playwright.sync_api import expect

from ..base import mock_tiles
Expand Down Expand Up @@ -87,3 +88,15 @@ def websocket_server():
yield ds_proc
# Shut it down at the end of the pytest session
ds_proc.terminate()


@pytest.fixture(scope="function")
def channels_live_server(request, settings):
server = ChannelsLiveServerTestCase()
server.serve_static = False
server._pre_setup()
settings.WEBSOCKET_FRONT_URI = f"{server.live_server_ws_url}/ws/sync/{{id}}/"

yield server

server._post_teardown()
16 changes: 8 additions & 8 deletions umap/tests/integration/test_websocket_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

@pytest.mark.xdist_group(name="websockets")
def test_websocket_connection_can_sync_markers(
new_page, live_server, websocket_server, tilelayer
new_page, live_server, channels_live_server, tilelayer
):
map = MapFactory(name="sync", edit_status=Map.ANONYMOUS)
map.settings["properties"]["syncEnabled"] = True
Expand Down Expand Up @@ -80,7 +80,7 @@ def test_websocket_connection_can_sync_markers(

@pytest.mark.xdist_group(name="websockets")
def test_websocket_connection_can_sync_polygons(
context, live_server, websocket_server, tilelayer
context, live_server, channels_live_server, tilelayer
):
map = MapFactory(name="sync", edit_status=Map.ANONYMOUS)
map.settings["properties"]["syncEnabled"] = True
Expand Down Expand Up @@ -164,7 +164,7 @@ def test_websocket_connection_can_sync_polygons(

@pytest.mark.xdist_group(name="websockets")
def test_websocket_connection_can_sync_map_properties(
new_page, live_server, websocket_server, tilelayer
new_page, live_server, channels_live_server, tilelayer
):
map = MapFactory(name="sync", edit_status=Map.ANONYMOUS)
map.settings["properties"]["syncEnabled"] = True
Expand Down Expand Up @@ -196,7 +196,7 @@ def test_websocket_connection_can_sync_map_properties(

@pytest.mark.xdist_group(name="websockets")
def test_websocket_connection_can_sync_datalayer_properties(
new_page, live_server, websocket_server, tilelayer
new_page, live_server, channels_live_server, tilelayer
):
map = MapFactory(name="sync", edit_status=Map.ANONYMOUS)
map.settings["properties"]["syncEnabled"] = True
Expand Down Expand Up @@ -225,7 +225,7 @@ def test_websocket_connection_can_sync_datalayer_properties(

@pytest.mark.xdist_group(name="websockets")
def test_websocket_connection_can_sync_cloned_polygons(
context, live_server, websocket_server, tilelayer
context, live_server, channels_live_server, tilelayer
):
map = MapFactory(name="sync", edit_status=Map.ANONYMOUS)
map.settings["properties"]["syncEnabled"] = True
Expand Down Expand Up @@ -288,7 +288,7 @@ def test_websocket_connection_can_sync_cloned_polygons(

@pytest.mark.xdist_group(name="websockets")
def test_websocket_connection_can_sync_late_joining_peer(
new_page, live_server, websocket_server, tilelayer
new_page, live_server, channels_live_server, tilelayer
):
map = MapFactory(name="sync", edit_status=Map.ANONYMOUS)
map.settings["properties"]["syncEnabled"] = True
Expand Down Expand Up @@ -349,7 +349,7 @@ def test_websocket_connection_can_sync_late_joining_peer(


@pytest.mark.xdist_group(name="websockets")
def test_should_sync_datalayers(new_page, live_server, websocket_server, tilelayer):
def test_should_sync_datalayers(new_page, live_server, channels_live_server, tilelayer):
map = MapFactory(name="sync", edit_status=Map.ANONYMOUS)
map.settings["properties"]["syncEnabled"] = True
map.save()
Expand Down Expand Up @@ -422,7 +422,7 @@ def test_should_sync_datalayers(new_page, live_server, websocket_server, tilelay

@pytest.mark.xdist_group(name="websockets")
def test_create_and_sync_map(
new_page, live_server, websocket_server, tilelayer, login, user
new_page, live_server, channels_live_server, tilelayer, login, user
):
# Create a syncable map with peerA
peerA = login(user, prefix="Page A")
Expand Down
22 changes: 0 additions & 22 deletions umap/tests/test_websocket_server.py

This file was deleted.

Loading
Loading