Skip to content

Commit

Permalink
Add starlette manager and examples for embedding perspective in a
Browse files Browse the repository at this point in the history
starlette server

Asyncify the perspective client, abstract away the websocket handler into its own class and utility classes

Implement starlette test client websocket wrapper, for testing purposes only. Websocket wrapper is synchronous so won't play nicely with perspective outside of a testing context

fix lint
  • Loading branch information
nickpatstew authored and timkpaine committed Jun 6, 2022
1 parent a8589d8 commit 5435ba0
Show file tree
Hide file tree
Showing 53 changed files with 1,452 additions and 268 deletions.
7 changes: 7 additions & 0 deletions examples/python-starlette/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# starlette-python

This example contains a simple `perspective-python` folder that uses Starlette/FastAPI to serve a static dataset to the user through various [data bindings](https://perspective.finos.org/docs/md/server.html):

- `index.html`: a [client/server replicated](https://perspective.finos.org/docs/md/server.html#clientserver-replicated) setup that synchronizes the client and server data using Apache Arrow.
- `server_mode.html`: a [server-only](https://perspective.finos.org/docs/md/server.html#server-only) setup that reads data and performs operations directly on the server using commands sent through the Websocket.
- `client_server_editing`: a client-server replicated setup that also enables editing, where edits from multiple clients are applied properly to the server, and then synchronized back to the clients.
24 changes: 24 additions & 0 deletions examples/python-starlette/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "python-starlette",
"private": true,
"version": "1.3.13",
"description": "An example of editing a `perspective-python` server from the browser.",
"scripts": {
"start": "PYTHONPATH=../../python/perspective python3 server.py"
},
"keywords": [],
"license": "Apache-2.0",
"dependencies": {
"@finos/perspective": "^1.3.13",
"@finos/perspective-viewer": "^1.3.13",
"@finos/perspective-viewer-d3fc": "^1.3.13",
"@finos/perspective-viewer-datagrid": "^1.3.13",
"@finos/perspective-workspace": "^1.3.13",
"superstore-arrow": "^1.0.0"
},
"devDependencies": {
"@finos/perspective-webpack-plugin": "^1.3.13",
"npm-run-all": "^4.1.3",
"rimraf": "^2.5.2"
}
}
77 changes: 77 additions & 0 deletions examples/python-starlette/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
################################################################################
#
# Copyright (c) 2022, the Perspective Authors.
#
# This file is part of the Perspective library, distributed under the terms of
# the Apache License 2.0. The full license can be found in the LICENSE file.
#
import asyncio
import os
import os.path
import logging
import threading
import uvicorn

from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import FileResponse
from starlette.staticfiles import StaticFiles

from perspective import Table, PerspectiveManager, PerspectiveStarletteHandler


here = os.path.abspath(os.path.dirname(__file__))
file_path = os.path.join(
here, "..", "..", "node_modules", "superstore-arrow", "superstore.arrow"
)


def static_nodemodules_handler(rest_of_path):
if rest_of_path.startswith("@finos"):
return FileResponse("../../node_modules/{}".format(rest_of_path))
return FileResponse("../../node_modules/@finos/{}".format(rest_of_path))


def perspective_thread(manager):
"""Perspective application thread starts its own event loop, and
adds the table with the name "data_source_one", which will be used
in the front-end."""
psp_loop = asyncio.new_event_loop()
manager.set_loop_callback(psp_loop.call_soon_threadsafe)
with open(file_path, mode="rb") as file:
table = Table(file.read(), index="Row ID")
manager.host_table("data_source_one", table)
psp_loop.run_forever()

def make_app():
manager = PerspectiveManager()

thread = threading.Thread(target=perspective_thread, args=(manager,))
thread.daemon = True
thread.start()

async def websocket_handler(websocket: WebSocket):
handler = PerspectiveStarletteHandler(manager, websocket)
await handler.run()

# static_html_files = StaticFiles(directory="../python-tornado", html=True)
static_html_files = StaticFiles(directory="../python-tornado", html=True)

app = FastAPI()
app.add_api_websocket_route('/websocket', websocket_handler)
app.get("/node_modules/{rest_of_path:path}")(static_nodemodules_handler)
app.mount("/", static_html_files)

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
return app

if __name__ == "__main__":
app = make_app()
logging.critical("Listening on http://localhost:8080")
uvicorn.run(app, host='0.0.0.0', port=8080)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "tornado-streaming-python",
"name": "python-tornado-streaming",
"private": true,
"version": "1.3.13",
"description": "An example of streaming a `perspective-python` server to the browser.",
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# tornado-python
# python-tornado

This example contains a simple `perspective-python` folder that uses Tornado to serve a static dataset to the user through various [data bindings](https://perspective.finos.org/docs/md/server.html):

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "tornado-python",
"name": "python-tornado",
"private": true,
"version": "1.3.13",
"description": "An example of editing a `perspective-python` server from the browser.",
Expand Down
File renamed without changes.
File renamed without changes.
10 changes: 6 additions & 4 deletions python/perspective/perspective/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
# the Apache License 2.0. The full license can be found in the LICENSE file.
#

from .libpsp import * # noqa: F401, F403
from .core import * # noqa: F401, F403
from .core._version import __version__ # noqa: F401
from .widget import * # noqa: F401, F403
from .libpsp import *
from .core import *
from .core._version import __version__
from .client import *
from .handlers import *
from .widget import *
12 changes: 10 additions & 2 deletions python/perspective/perspective/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
# the Apache License 2.0. The full license can be found in the LICENSE file.
#

from .client import PerspectiveClient # noqa: F401
from .client import PerspectiveClient

__all__ = ["PerspectiveClient"]
try:
from .aiohttp import PerspectiveAIOHTTPClient, websocket as aiohttp_websocket
except ImportError:
...

try:
from .tornado import PerspectiveTornadoClient, websocket as tornado_websocket
except ImportError:
...
7 changes: 7 additions & 0 deletions python/perspective/perspective/client/aiohttp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
################################################################################
#
# Copyright (c) 2022, the Perspective Authors.
#
# This file is part of the Perspective library, distributed under the terms of
# the Apache License 2.0. The full license can be found in the LICENSE file.
#
4 changes: 2 additions & 2 deletions python/perspective/perspective/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# This file is part of the Perspective library, distributed under the terms of
# the Apache License 2.0. The full license can be found in the LICENSE file.
#
import asyncio

from random import random

Expand Down Expand Up @@ -52,7 +53,6 @@ def _handle(self, msg):
return

handler = self._handlers.get(msg["data"].get("id"))

if handler:
future = handler.get("future", None)
keep_alive = handler.get("keep_alive", False)
Expand Down Expand Up @@ -124,7 +124,7 @@ def post(self, msg, future=None, keep_alive=False):

msg["id"] = self._msg_id

self.send(msg)
return asyncio.ensure_future(self.send(msg))

def table(self, data, index=None, limit=None, name=None):
"""Create a new `Table` in the server implementation, and return
Expand Down
4 changes: 2 additions & 2 deletions python/perspective/perspective/client/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# the Apache License 2.0. The full license can be found in the LICENSE file.
#

import tornado
import asyncio


def async_queue(client, name, method, cmd, *args, **kwargs):
Expand All @@ -26,7 +26,7 @@ def async_queue(client, name, method, cmd, *args, **kwargs):
"subscribe": False,
}

future = tornado.concurrent.Future()
future = asyncio.Future()
client.post(msg, future)
return future

Expand Down
111 changes: 111 additions & 0 deletions python/perspective/perspective/client/starlette_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
################################################################################
#
# Copyright (c) 2022, the Perspective Authors.
#
# This file is part of the Perspective library, distributed under the terms of
# the Apache License 2.0. The full license can be found in the LICENSE file.
#

from starlette.testclient import TestClient
from starlette.websockets import WebSocketDisconnect

from queue import Empty

from .websocket import (
PerspectiveWebsocketClient,
PerspectiveWebsocketConnection,
Periodic,
)


class _StarletteTestPeriodic(Periodic):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Dont do anything as this should only ever be used in tests

async def start(self):
# Dont do anything as this should only ever be used in tests
...

async def stop(self):
# Dont do anything as this should only ever be used in tests
...


class _PerspectiveStarletteWebsocketConnection(PerspectiveWebsocketConnection):
def __init__(self, client: TestClient):
"""A Starlette Websocket client.
NOTE: For use in tests only!
Args:
client (TestClient): starlette TestClient instance
"""
self._client = client
self._ws = None
self._on_message = None
self._send_cache = None

async def connect(self, url, on_message, max_message_size) -> None:
self._ws = self._client.websocket_connect(url).__enter__()
self._on_message = on_message

def periodic(self, callback, interval) -> Periodic:
return _StarletteTestPeriodic(callback=callback, interval=interval)

def _on_message_internal(self):
# This is a hacky workaround for the test client's fundamentally synchronous nature
attempt = 0
try_count = 5
while attempt < try_count:
try:
while True:
# increase to avoid races
message = self._ws._send_queue.get(timeout=0.01)

if isinstance(message, BaseException):
raise message

self._ws._raise_on_close(message)

if "text" in message:
self._on_message(message["text"])
if "bytes" in message:
self._on_message(message["bytes"])

except Empty:
attempt += 1

async def write(self, message, binary=False):
# read back message
self._on_message_internal()

if binary:
self._ws.send_bytes(message)
else:
self._ws.send_text(message)

# read back message
self._on_message_internal()

async def close(self):
try:
self._ws.__exit__()
except WebSocketDisconnect:
return


class _PerspectiveStarletteTestClient(PerspectiveWebsocketClient):
def __init__(self, test_client: TestClient):
"""Create a `PerspectiveStarletteTestClient` that interfaces with a Perspective server over a Websocket"""
super(_PerspectiveStarletteTestClient, self).__init__(
_PerspectiveStarletteWebsocketConnection(test_client)
)


async def websocket(test_client: TestClient, url: str):
"""Create a new websocket client at the given `url` using the thread current
tornado loop."""
client = _PerspectiveStarletteTestClient(test_client)
await client.connect(url)
return client
8 changes: 4 additions & 4 deletions python/perspective/perspective/client/table_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# the Apache License 2.0. The full license can be found in the LICENSE file.
#

import tornado
import asyncio

from functools import partial
from .dispatch import async_queue, subscribe, unsubscribe
Expand All @@ -27,7 +27,7 @@ def table(client, data, name, index=None, limit=None):

msg = {"cmd": "table", "name": name, "args": [data], "options": options}

future = tornado.concurrent.Future()
future = asyncio.Future()
client.post(msg, future)
return future

Expand Down Expand Up @@ -128,7 +128,7 @@ def update(self, data, port_id=0):
"args": [data, {"port_id": port_id}],
"subscribe": False,
}
self._client.post(msg)
return self._client.post(msg)

def remove(self, pkeys, port_id=0):
msg = {
Expand All @@ -138,4 +138,4 @@ def remove(self, pkeys, port_id=0):
"args": [pkeys, {"port_id": port_id}],
"subscribe": False,
}
self._client.post(msg)
return self._client.post(msg)
Loading

0 comments on commit 5435ba0

Please sign in to comment.