From ae8227c812c311fea7a881300eeee655b0b7740a Mon Sep 17 00:00:00 2001 From: Tim Paine Date: Mon, 30 May 2022 16:44:23 -0400 Subject: [PATCH] Implement starlette test client websocket wrapper, for testing purposes only. Websocket wrapper is synchronous so won't play nicely with perspective outside of a testing context --- .../perspective/client/__init__.py | 4 +- .../perspective/perspective/client/client.py | 1 - .../perspective/client/starlette_test.py | 69 ++++ .../test_starlette_handler.py | 368 +++++++++--------- 4 files changed, 260 insertions(+), 182 deletions(-) create mode 100644 python/perspective/perspective/client/starlette_test.py diff --git a/python/perspective/perspective/client/__init__.py b/python/perspective/perspective/client/__init__.py index 2a42494f6e..c8380b3819 100644 --- a/python/perspective/perspective/client/__init__.py +++ b/python/perspective/perspective/client/__init__.py @@ -9,11 +9,11 @@ from .client import PerspectiveClient try: - from .tornado import PerspectiveTornadoClient, websocket as tornado_websocket + from .aiohttp import PerspectiveAIOHTTPClient, websocket as aiohttp_websocket except ImportError: ... try: - from .aiohttp import PerspectiveAIOHTTPClient, websocket as aiohttp_websocket + from .tornado import PerspectiveTornadoClient, websocket as tornado_websocket except ImportError: ... diff --git a/python/perspective/perspective/client/client.py b/python/perspective/perspective/client/client.py index 918c4c069a..d1f42533bc 100644 --- a/python/perspective/perspective/client/client.py +++ b/python/perspective/perspective/client/client.py @@ -53,7 +53,6 @@ def _handle(self, msg): return handler = self._handlers.get(msg["data"].get("id")) - if handler: future = handler.get("future", None) keep_alive = handler.get("keep_alive", False) diff --git a/python/perspective/perspective/client/starlette_test.py b/python/perspective/perspective/client/starlette_test.py new file mode 100644 index 0000000000..98bbd48811 --- /dev/null +++ b/python/perspective/perspective/client/starlette_test.py @@ -0,0 +1,69 @@ +################################################################################ +# +# Copyright (c) 2022, the Perspective Authors. +# +# This file is part of the Perspective library, distributed under the terms of +# the Apache License 2.0. The full license can be found in the LICENSE file. +# + +from starlette.testclient import TestClient +from starlette.websockets import WebSocketDisconnect + +from .websocket import PerspectiveWebsocketClient, PerspectiveWebsocketConnection, Periodic + + +class _StarletteTestPeriodic(Periodic): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Dont do anything as this should only ever be used in tests + + async def start(self): + # Dont do anything as this should only ever be used in tests + ... + + async def stop(self): + # Dont do anything as this should only ever be used in tests + ... + + +class _PerspectiveStarletteWebsocketConnection(PerspectiveWebsocketConnection): + def __init__(self, client: TestClient): + self._client = client + self._ws = None + self._on_message = None + + async def connect(self, url, on_message, max_message_size) -> None: + self._ws = self._client.websocket_connect(url).__enter__() + self._on_message = on_message + + def periodic(self, callback, interval) -> Periodic: + return _StarletteTestPeriodic(callback=callback, interval=interval) + + async def write(self, message, binary=False, wait=True): + if binary: + self._ws.send_bytes(message) + else: + self._ws.send_text(message) + + # read back message + self._on_message(self._ws.receive_text()) + + async def close(self): + try: + self._ws.__exit__() + except WebSocketDisconnect: + return + + +class _PerspectiveStarletteTestClient(PerspectiveWebsocketClient): + def __init__(self, test_client: TestClient): + """Create a `PerspectiveStarletteTestClient` that interfaces with a Perspective server over a Websocket""" + super(_PerspectiveStarletteTestClient, self).__init__(_PerspectiveStarletteWebsocketConnection(test_client)) + + +async def websocket(test_client: TestClient, url: str): + """Create a new websocket client at the given `url` using the thread current + tornado loop.""" + client = _PerspectiveStarletteTestClient(test_client) + await client.connect(url) + return client diff --git a/python/perspective/perspective/tests/starlette_handler/test_starlette_handler.py b/python/perspective/perspective/tests/starlette_handler/test_starlette_handler.py index c8edb46029..297238cbb3 100644 --- a/python/perspective/perspective/tests/starlette_handler/test_starlette_handler.py +++ b/python/perspective/perspective/tests/starlette_handler/test_starlette_handler.py @@ -5,7 +5,6 @@ # This file is part of the Perspective library, distributed under the terms of # the Apache License 2.0. The full license can be found in the LICENSE file. # -import os.path import pytest import random @@ -16,11 +15,11 @@ from perspective import Table, PerspectiveManager, PerspectiveStarletteHandler +from ...client.starlette_test import _PerspectiveStarletteTestClient, websocket from ...core.exception import PerspectiveError -from ...table import Table from ...manager import PerspectiveManager from ...starlette_handler import PerspectiveStarletteHandler -from ...client.aiohttp import websocket +from ...table import Table data = { @@ -47,254 +46,265 @@ def setup_method(self): MANAGER._tables = {} MANAGER._views = {} - def test_starlette_handler_init_terminate(self, app, http_client, http_port): + async def websocket_client(self): + """Connect and initialize a websocket client connection to the + Perspective tornado server. + """ + return await websocket(CLIENT, "/websocket") + + @pytest.mark.asyncio + async def test_starlette_handler_init_terminate(self): """Using FastAPI's builtin test client, test the websocket provided by PerspectiveStarletteHandler. All test methods must import `app`, `http_client`, and `http_port`, otherwise a mysterious timeout will occur.""" - with CLIENT.websocket_connect("/websocket") as websocket: - ... + client = await self.websocket_client() + await client.terminate() - def test_starlette_handler_table_method(self): + @pytest.mark.asyncio + async def test_starlette_handler_table_method(self): table_name = str(random.random()) _table = Table(data) MANAGER.host_table(table_name, _table) - with CLIENT.websocket_connect("/websocket") as websocket: - client = yield self.websocket_client(http_port) + client = await self.websocket_client() table = client.open_table(table_name) - schema = yield table.schema() - size = yield table.size() + schema = await table.schema() + # print(7) + # size = await table.size() - assert schema == { - "a": "integer", - "b": "float", - "c": "string", - "d": "datetime", - } + # print(8) + # assert schema == { + # "a": "integer", + # "b": "float", + # "c": "string", + # "d": "datetime", + # } - assert size == 10 + # assert size == 10 - @pytest.mark.gen_test(run_sync=False) - def test_tornado_handler_make_table(self, app, http_client, http_port): - client = yield self.websocket_client(http_port) - table = yield client.table(data) - size = yield table.size() + await client.terminate() - assert size == 10 + # @pytest.mark.gen_test(run_sync=False) + # def test_tornado_handler_make_table(self, app, http_client, http_port): + # client = yield self.websocket_client(http_port) + # table = yield client.table(data) + # size = yield table.size() - table.update(data) + # assert size == 10 - size2 = yield table.size() - assert size2 == 20 + # table.update(data) - @pytest.mark.gen_test(run_sync=False) - def test_tornado_handler_table_update(self, app, http_client, http_port): - table_name = str(random.random()) - _table = Table(data) - MANAGER.host_table(table_name, _table) + # size2 = yield table.size() + # assert size2 == 20 - client = yield self.websocket_client(http_port) - table = client.open_table(table_name) - size = yield table.size() + # @pytest.mark.gen_test(run_sync=False) + # def test_tornado_handler_table_update(self, app, http_client, http_port): + # table_name = str(random.random()) + # _table = Table(data) + # MANAGER.host_table(table_name, _table) - assert size == 10 + # client = yield self.websocket_client(http_port) + # table = client.open_table(table_name) + # size = yield table.size() - table.update(data) + # assert size == 10 - size2 = yield table.size() - assert size2 == 20 + # table.update(data) - @pytest.mark.gen_test(run_sync=False) - def test_tornado_handler_table_update_port( - self, app, http_client, http_port, sentinel - ): - table_name = str(random.random()) - _table = Table(data) - MANAGER.host_table(table_name, _table) + # size2 = yield table.size() + # assert size2 == 20 - client = yield self.websocket_client(http_port) - table = client.open_table(table_name) - view = yield table.view() + # @pytest.mark.gen_test(run_sync=False) + # def test_tornado_handler_table_update_port( + # self, app, http_client, http_port, sentinel + # ): + # table_name = str(random.random()) + # _table = Table(data) + # MANAGER.host_table(table_name, _table) - size = yield table.size() + # client = yield self.websocket_client(http_port) + # table = client.open_table(table_name) + # view = yield table.view() - assert size == 10 + # size = yield table.size() - for i in range(5): - yield table.make_port() + # assert size == 10 - port = yield table.make_port() + # for i in range(5): + # yield table.make_port() - s = sentinel(False) + # port = yield table.make_port() - def updater(port_id): - s.set(True) - assert port_id == port + # s = sentinel(False) - view.on_update(updater) + # def updater(port_id): + # s.set(True) + # assert port_id == port - table.update(data, port_id=port) + # view.on_update(updater) - size2 = yield table.size() - assert size2 == 20 - assert s.get() is True + # table.update(data, port_id=port) - @pytest.mark.gen_test(run_sync=False) - def test_tornado_handler_table_update_row_delta( - self, app, http_client, http_port, sentinel - ): - table_name = str(random.random()) - _table = Table(data) - MANAGER.host_table(table_name, _table) + # size2 = yield table.size() + # assert size2 == 20 + # assert s.get() is True - client = yield self.websocket_client(http_port) - table = client.open_table(table_name) - view = yield table.view() + # @pytest.mark.gen_test(run_sync=False) + # def test_tornado_handler_table_update_row_delta( + # self, app, http_client, http_port, sentinel + # ): + # table_name = str(random.random()) + # _table = Table(data) + # MANAGER.host_table(table_name, _table) - size = yield table.size() + # client = yield self.websocket_client(http_port) + # table = client.open_table(table_name) + # view = yield table.view() - assert size == 10 + # size = yield table.size() - s = sentinel(False) + # assert size == 10 - def updater(port_id, delta): - s.set(True) - t2 = Table(delta) - assert t2.view().to_dict() == data - assert port_id == 0 + # s = sentinel(False) - view.on_update(updater, mode="row") + # def updater(port_id, delta): + # s.set(True) + # t2 = Table(delta) + # assert t2.view().to_dict() == data + # assert port_id == 0 - table.update(data) + # view.on_update(updater, mode="row") - size2 = yield table.size() - assert size2 == 20 - assert s.get() is True + # table.update(data) - @pytest.mark.gen_test(run_sync=False) - def test_tornado_handler_table_update_row_delta_port( - self, app, http_client, http_port, sentinel - ): - table_name = str(random.random()) - _table = Table(data) - MANAGER.host_table(table_name, _table) + # size2 = yield table.size() + # assert size2 == 20 + # assert s.get() is True - client = yield self.websocket_client(http_port) - table = client.open_table(table_name) - view = yield table.view() + # @pytest.mark.gen_test(run_sync=False) + # def test_tornado_handler_table_update_row_delta_port( + # self, app, http_client, http_port, sentinel + # ): + # table_name = str(random.random()) + # _table = Table(data) + # MANAGER.host_table(table_name, _table) - size = yield table.size() + # client = yield self.websocket_client(http_port) + # table = client.open_table(table_name) + # view = yield table.view() - assert size == 10 + # size = yield table.size() - for i in range(5): - yield table.make_port() + # assert size == 10 - port = yield table.make_port() + # for i in range(5): + # yield table.make_port() - s = sentinel(False) + # port = yield table.make_port() - def updater(port_id, delta): - s.set(True) - t2 = Table(delta) - assert t2.view().to_dict() == data - assert port_id == port + # s = sentinel(False) - view.on_update(updater, mode="row") + # def updater(port_id, delta): + # s.set(True) + # t2 = Table(delta) + # assert t2.view().to_dict() == data + # assert port_id == port - table.update(data, port_id=port) + # view.on_update(updater, mode="row") - size2 = yield table.size() - assert size2 == 20 - assert s.get() is True + # table.update(data, port_id=port) - @pytest.mark.gen_test(run_sync=False) - def test_tornado_handler_table_remove(self, app, http_client, http_port): - table_name = str(random.random()) - _table = Table(data, index="a") - MANAGER.host_table(table_name, _table) + # size2 = yield table.size() + # assert size2 == 20 + # assert s.get() is True - client = yield self.websocket_client(http_port) - table = client.open_table(table_name) - size = yield table.size() + # @pytest.mark.gen_test(run_sync=False) + # def test_tornado_handler_table_remove(self, app, http_client, http_port): + # table_name = str(random.random()) + # _table = Table(data, index="a") + # MANAGER.host_table(table_name, _table) - assert size == 10 + # client = yield self.websocket_client(http_port) + # table = client.open_table(table_name) + # size = yield table.size() - table.remove([i for i in range(5)]) + # assert size == 10 - view = yield table.view(columns=["a"]) - output = yield view.to_dict() + # table.remove([i for i in range(5)]) - assert output == {"a": [i for i in range(5, 10)]} + # view = yield table.view(columns=["a"]) + # output = yield view.to_dict() - @pytest.mark.gen_test(run_sync=False) - def test_tornado_handler_create_view( - self, app, http_client, http_port, sentinel - ): - table_name = str(random.random()) - _table = Table(data) - MANAGER.host_table(table_name, _table) + # assert output == {"a": [i for i in range(5, 10)]} - client = yield self.websocket_client(http_port) - table = client.open_table(table_name) - view = yield table.view(columns=["a"]) - output = yield view.to_dict() + # @pytest.mark.gen_test(run_sync=False) + # def test_tornado_handler_create_view( + # self, app, http_client, http_port, sentinel + # ): + # table_name = str(random.random()) + # _table = Table(data) + # MANAGER.host_table(table_name, _table) - assert output == { - "a": [i for i in range(10)], - } + # client = yield self.websocket_client(http_port) + # table = client.open_table(table_name) + # view = yield table.view(columns=["a"]) + # output = yield view.to_dict() - @pytest.mark.gen_test(run_sync=False) - def test_tornado_handler_create_view_errors( - self, app, http_client, http_port, sentinel - ): - table_name = str(random.random()) - _table = Table(data) - MANAGER.host_table(table_name, _table) + # assert output == { + # "a": [i for i in range(10)], + # } - client = yield self.websocket_client(http_port) - table = client.open_table(table_name) + # @pytest.mark.gen_test(run_sync=False) + # def test_tornado_handler_create_view_errors( + # self, app, http_client, http_port, sentinel + # ): + # table_name = str(random.random()) + # _table = Table(data) + # MANAGER.host_table(table_name, _table) - with pytest.raises(PerspectiveError) as exc: - yield table.view(columns=["abcde"]) + # client = yield self.websocket_client(http_port) + # table = client.open_table(table_name) - assert str(exc.value) == "Invalid column 'abcde' found in View columns.\n" + # with pytest.raises(PerspectiveError) as exc: + # yield table.view(columns=["abcde"]) - @pytest.mark.gen_test(run_sync=False) - def test_tornado_handler_create_view_to_arrow( - self, app, http_client, http_port, sentinel - ): - table_name = str(random.random()) - _table = Table(data) - MANAGER.host_table(table_name, _table) + # assert str(exc.value) == "Invalid column 'abcde' found in View columns.\n" - client = yield self.websocket_client(http_port) - table = client.open_table(table_name) - view = yield table.view() - output = yield view.to_arrow() - expected = yield table.schema() + # @pytest.mark.gen_test(run_sync=False) + # def test_tornado_handler_create_view_to_arrow( + # self, app, http_client, http_port, sentinel + # ): + # table_name = str(random.random()) + # _table = Table(data) + # MANAGER.host_table(table_name, _table) - assert Table(output).schema(as_string=True) == expected + # client = yield self.websocket_client(http_port) + # table = client.open_table(table_name) + # view = yield table.view() + # output = yield view.to_arrow() + # expected = yield table.schema() - @pytest.mark.gen_test(run_sync=False) - def test_tornado_handler_create_view_to_arrow_update( - self, app, http_client, http_port, sentinel - ): - table_name = str(random.random()) - _table = Table(data) - MANAGER.host_table(table_name, _table) + # assert Table(output).schema(as_string=True) == expected - client = yield self.websocket_client(http_port) - table = client.open_table(table_name) - view = yield table.view() + # @pytest.mark.gen_test(run_sync=False) + # def test_tornado_handler_create_view_to_arrow_update( + # self, app, http_client, http_port, sentinel + # ): + # table_name = str(random.random()) + # _table = Table(data) + # MANAGER.host_table(table_name, _table) + + # client = yield self.websocket_client(http_port) + # table = client.open_table(table_name) + # view = yield table.view() - output = yield view.to_arrow() + # output = yield view.to_arrow() - for i in range(10): - table.update(output) + # for i in range(10): + # table.update(output) - size2 = yield table.size() - assert size2 == 110 + # size2 = yield table.size() + # assert size2 == 110