-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add PerspectiveManager remote API for Python, Tornado server example #743
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
<!-- | ||
|
||
Copyright (c) 2017, the Perspective Authors. | ||
|
||
This file is part of the Perspective library, distributed under the terms of | ||
the Apache License 2.0. The full license can be found in the LICENSE file. | ||
|
||
--> | ||
|
||
<!DOCTYPE html> | ||
<html> | ||
|
||
<head> | ||
|
||
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1, minimum-scale=1, user-scalable=no"> | ||
|
||
|
||
<script src="https://unpkg.com/@finos/[email protected]/dist/umd/perspective-viewer.js"></script> | ||
<script src="https://unpkg.com/@finos/[email protected]/dist/umd/perspective-viewer-hypergrid.js"></script> | ||
<script src="https://unpkg.com/@finos/[email protected]/dist/umd/perspective-viewer-d3fc.js"></script> | ||
|
||
<script src="https://unpkg.com/@finos/[email protected]/dist/umd/perspective.js"></script> | ||
|
||
<link rel='stylesheet' href="https://unpkg.com/@finos/perspective-viewer/dist/umd/material.dark.css"> | ||
|
||
<style> | ||
perspective-viewer{position:absolute;top:0;left:0;right:0;bottom:0;} | ||
</style> | ||
|
||
</head> | ||
|
||
<body> | ||
|
||
<!-- | ||
Clicking on cells in the grid and typing will fire edits back into the dataframe in memory. | ||
|
||
If you open another tab and navigate to the same URL, edits will appear in all the windows in real time. | ||
|
||
This allows collaborative editing of the underlying dataframe, with all updates propagated automatically to all clients. | ||
--> | ||
<perspective-viewer | ||
id="viewer" | ||
editable> | ||
|
||
</perspective-viewer> | ||
|
||
<script> | ||
|
||
window.addEventListener('WebComponentsReady', async function() { | ||
|
||
// Create a client that expects a Perspective server to accept connections at the specified URL. | ||
const websocket = perspective.websocket("ws://localhost:8888/websocket"); | ||
|
||
/* `table` is a proxy for the `Table` we created on the server. | ||
|
||
All operations that are possible through the Javascript API are possible on the Python API as well, | ||
thus calling `view()`, `schema()`, `update()` etc on `const table` will pass those operations to the | ||
Python `Table`, execute the commands, and return the result back to Javascript. | ||
*/ | ||
const table = websocket.open_table('data_source_one'); | ||
|
||
// Load this in the `<perspective-viewer>`. | ||
document.getElementById('viewer').load(table); | ||
}); | ||
|
||
</script> | ||
|
||
</body> | ||
|
||
</html> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
from perspective import Table, PerspectiveManager | ||
from datetime import date, datetime | ||
import json | ||
import tornado.websocket | ||
import tornado.web | ||
import tornado.ioloop | ||
import pandas as pd | ||
import sys | ||
import os | ||
sys.path.insert(1, os.path.join(sys.path[0], '..')) | ||
|
||
''' | ||
Import the Table and PerspectiveManager classes. | ||
|
||
A Perspective `Table` is instantiated either with data or a `schema`. | ||
|
||
The `PerspectiveManager` class handles incoming messages from the client Perspective through a WebSocket connection. | ||
''' | ||
|
||
|
||
class DateTimeEncoder(json.JSONEncoder): | ||
'''Create a custom JSON encoder that allows serialization of datetime and date objects.''' | ||
|
||
def default(self, obj): | ||
if isinstance(obj, datetime): | ||
# Convert to milliseconds - perspective.js expects millisecond timestamps, but python generates them in seconds. | ||
return obj.timestamp() * 1000 | ||
return super(DateTimeEncoder, self).default(obj) | ||
|
||
|
||
class MainHandler(tornado.web.RequestHandler): | ||
|
||
def set_default_headers(self): | ||
self.set_header("Access-Control-Allow-Origin", "*") | ||
self.set_header("Access-Control-Allow-Headers", "x-requested-with") | ||
self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS') | ||
|
||
def get(self): | ||
self.render("perspective_tornado_client.html") | ||
|
||
|
||
class SimpleWebSocket(tornado.websocket.WebSocketHandler): | ||
|
||
def on_message(self, message): | ||
'''When the websocket receives a message, send it to the `process` method of the `PerspectiveManager` with a reference to the `post` callback.''' | ||
if message == "heartbeat": | ||
return | ||
message = json.loads(message) | ||
MANAGER.process(message, self.post) | ||
|
||
def post(self, message): | ||
'''When `post` is called by `PerspectiveManager`, serialize the data to JSON and send it to the client.''' | ||
message = json.dumps(message, cls=DateTimeEncoder) | ||
self.write_message(message) | ||
|
||
|
||
def make_app(): | ||
return tornado.web.Application([ | ||
(r"/", MainHandler), | ||
# create a websocket endpoint that the client Javascript can access | ||
(r"/websocket", SimpleWebSocket) | ||
]) | ||
|
||
|
||
if __name__ == "__main__": | ||
'''Create an instance of the `PerspectiveManager`. | ||
|
||
The manager instance tracks tables and views, manages method calls on them, and parses messages from the client. | ||
''' | ||
MANAGER = PerspectiveManager() | ||
|
||
'''Perspective can load data in row, column, and dataframe format. | ||
|
||
- Row format (list[dict{string:value}]): [{"column_1": 1, "column_2": "abc", "column_3": True, "column_4": datetime.now(), "column_5": date.today()}] | ||
* Each element in the list is a dict, which represents a row of data. | ||
- Column format (dict{string: list}): {"column": [1, 2, 3]} | ||
* The keys of the dict are string column names, and the values are lists that contain the value for each row. | ||
* Numpy arrays can also be used in this format, i.e. {"a": numpy.arange(100)} | ||
- DataFrame (pandas.DataFrame): Perspective has full support for dataframe loading, updating, and editing. | ||
|
||
For this example, we'll load a sample dataframe from a pickle, and provide it to the Table. | ||
''' | ||
tbl = Table(pd.read_pickle("FTSE100.pkl")) | ||
|
||
'''Once the Table is created, pass it to the manager instance with a name. | ||
|
||
Make sure that the name here is used in the client HTML when we call `open_table`. | ||
|
||
Once the manager has the table, commands from the client will be tracked and applied. | ||
''' | ||
MANAGER.host_table("data_source_one", tbl) | ||
|
||
# start the Tornado server | ||
app = make_app() | ||
app.listen(8888) | ||
loop = tornado.ioloop.IOLoop.current() | ||
loop.start() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
# ***************************************************************************** | ||
# | ||
# Copyright (c) 2019, the Perspective Authors. | ||
# | ||
# This file is part of the Perspective library, distributed under the terms of | ||
# the Apache License 2.0. The full license can be found in the LICENSE file. | ||
# | ||
from functools import partial | ||
from .table import Table | ||
from ._exception import PerspectiveError | ||
|
||
|
||
class PerspectiveManager(object): | ||
def __init__(self): | ||
self._tables = {} | ||
self._views = {} | ||
self._callback_cache = {} | ||
|
||
def host_table(self, name, table): | ||
'''Given a reference to a `Table`, manage it and allow operations on it to occur through the Manager.''' | ||
self._tables[name] = table | ||
|
||
def process(self, msg, post_callback): | ||
'''Given a message from the client, process it through the Perspective engine. | ||
|
||
Params: | ||
msg (dict) : a message from the client with instructions that map to engine operations | ||
post_callback (callable) : a function that returns data to the client | ||
''' | ||
if not isinstance(msg, dict): | ||
raise PerspectiveError("Message passed into `process()` should be a dict, i.e. JSON strings should have been deserialized using `json.dumps()`.") | ||
|
||
cmd = msg["cmd"] | ||
|
||
if cmd == "init": | ||
# return empty response | ||
post_callback(self._make_message(msg["id"], None)) | ||
elif cmd == "table": | ||
try: | ||
# create a new Table and track it | ||
data_or_schema = msg["args"][0] | ||
self._tables[msg["name"]] = Table(data_or_schema, msg.get("options", {})) | ||
except IndexError: | ||
self._tables[msg["name"]] = [] | ||
elif cmd == "view": | ||
# create a new view and track it | ||
new_view = self._tables[msg["table_name"]].view(msg.get("config", {})) | ||
self._views[msg["view_name"]] = new_view | ||
elif cmd == "table_method" or cmd == "view_method": | ||
self._process_method_call(msg, post_callback) | ||
|
||
def _process_method_call(self, msg, post_callback): | ||
'''When the client calls a method, validate the instance it calls on and return the result.''' | ||
if msg["cmd"] == "table_method": | ||
table_or_view = self._tables.get(msg["name"], None) | ||
else: | ||
table_or_view = self._views.get(msg["name"], None) | ||
if table_or_view is None: | ||
post_callback(self._make_error_message(msg["id"], "View is not initialized")) | ||
try: | ||
if msg.get("subscribe", False) is True: | ||
self._process_subscribe(msg, table_or_view, post_callback) | ||
else: | ||
args = msg.get("args", []) | ||
if msg["method"] == "schema": | ||
args = [True] # make sure schema returns string types | ||
if msg["method"] != "delete": | ||
result = getattr(table_or_view, msg["method"])(*args) | ||
post_callback(self._make_message(msg["id"], result)) | ||
else: | ||
if msg["cmd"] == "view_method": | ||
del self._views[msg["name"]] | ||
except Exception as error: | ||
print(self._make_error_message(msg["id"], error)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove print |
||
|
||
def _process_subscribe(self, msg, table_or_view, post_callback): | ||
'''When the client attempts to add or remove a subscription callback, validate and perform the requested operation. | ||
|
||
Params: | ||
msg (dict) : the message from the client | ||
table_or_view {Table|View} : the instance that the subscription will be called on | ||
post_callback (callable) : a method that notifies the client with new data | ||
''' | ||
try: | ||
callback = None | ||
callback_id = msg.get("callback_id", None) | ||
method = msg.get("method", None) | ||
if method and method[:2] == "on": | ||
# wrap the callback | ||
callback = partial(PerspectiveManager.callback, msg=msg, post_callback=post_callback, self=self) | ||
if callback_id: | ||
self._callback_cache[callback_id] = callback | ||
elif callback_id is not None: | ||
# remove the callback with `callback_id` | ||
del self._callback_cache[callback_id] | ||
if callback is not None: | ||
# call the underlying method on the Table or View | ||
getattr(table_or_view, method)(callback, *msg.get("args", [])) | ||
else: | ||
print("callback not found for remote call {}".format(msg)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. avoid prints, use logging if you want to print |
||
except Exception as error: | ||
print(self._make_error_message(msg["id"], error)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove print |
||
|
||
def callback(self, **kwargs): | ||
'''Return a message to the client using the `post_callback` method.''' | ||
id = kwargs.get("msg")["id"] | ||
data = kwargs.get("event", None) | ||
post_callback = kwargs.get("post_callback") | ||
post_callback(self._make_message(id, data)) | ||
|
||
def _make_message(self, id, result): | ||
'''Return a serializable message for a successful result.''' | ||
return { | ||
"id": id, | ||
"data": result | ||
} | ||
|
||
def _make_error_message(self, id, error): | ||
'''Return a serializable message for an error result.''' | ||
return { | ||
"id": id, | ||
"error": error | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wouldnt do this