Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python client implementation #1199

Merged
merged 2 commits into from
Sep 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions packages/perspective/test/js/updates.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,40 +70,43 @@ const arrow_indexed_result = [
module.exports = perspective => {
describe("Removes", function() {
it("after an `update()`", async function() {
var table = perspective.table(meta, {index: "x"});
const table = perspective.table(meta, {index: "x"});
table.update(data);
var view = table.view();
const view = table.view();
table.remove([1, 2]);
let result = await view.to_json();
const result = await view.to_json();
expect(result.length).toEqual(2);
expect(result).toEqual(data.slice(2, 4));
// expect(await table.size()).toEqual(2);
view.delete();
table.delete();
});

it("after a regular data load", async function() {
var table = perspective.table(data, {index: "x"});
var view = table.view();
const table = perspective.table(data, {index: "x"});
const view = table.view();
table.remove([1, 2]);
let result = await view.to_json();
const result = await view.to_json();
expect(result.length).toEqual(2);
expect(result).toEqual(data.slice(2, 4));
// expect(await table.size()).toEqual(2);
view.delete();
table.delete();
});

it("multiple single element removes", async function() {
let table = perspective.table(meta, {index: "x"});
const table = perspective.table(meta, {index: "x"});
for (let i = 0; i < 100; i++) {
table.update([{x: i, y: "test", z: false}]);
}
for (let i = 1; i < 100; i++) {
table.remove([i]);
}
let view = table.view();
let result = await view.to_json();
const view = table.view();
const result = await view.to_json();
expect(result).toEqual([{x: 0, y: "test", z: false}]);
expect(result.length).toEqual(1);
// expect(await table.size()).toEqual(1);
view.delete();
table.delete();
});
Expand Down
11 changes: 11 additions & 0 deletions python/perspective/perspective/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
################################################################################
#
# Copyright (c) 2019, the Perspective Authors.
#
# This file is part of the Perspective library, distributed under the terms of
# the Apache License 2.0. The full license can be found in the LICENSE file.
#

from .client import PerspectiveClient # noqa: F401

__all__ = ["PerspectiveClient"]
145 changes: 145 additions & 0 deletions python/perspective/perspective/client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
################################################################################
#
# Copyright (c) 2019, the Perspective Authors.
#
# This file is part of the Perspective library, distributed under the terms of
# the Apache License 2.0. The full license can be found in the LICENSE file.
#
from random import random

from tornado.concurrent import Future
from ..core.exception import PerspectiveError
from .table_api import PerspectiveTableProxy
from .table_api import table as make_table
from .view_api import PerspectiveViewProxy


class PerspectiveClient(object):
def __init__(self):
"""A base class for a Perspective client implementation that offers
a fully async API through an event loop.

Child classes must implement `send()`, which defines how a message
should be delivered to the Perspective server implementation, as well
as call `set_event_loop` in their `__init__()`.
"""
self._msg_id = 0
self._handlers = {}
self._loop = None
self._create_future = None

# Because we can't reuse a `Future` in Python, we can't just resolve
# the same `Future` over and over again in order to actually execute
# on_update/on_delete etc. callbacks.
#
# Instead, we store the callbacks on the client and use `callback_id`
# to access callback references. `self._callback_cache` is a map of
# callbacks to integer IDs, which is required to allow the `remove_{}`
# APIs to receive a callable to remove.
#
# To execute the callbacks themselves, we can look up the callback
# in `self._callback_id_cache` (a dict of integer ID to callback),
# and execute it in the `_handle` method.
self._callback_cache = {}
self._callback_id_cache = {}
self._callback_id = 0

def set_event_loop(self, loop):
"""Given an event loop, decide how to create a new `Future` object
using the loop, and attach the loop to this client instance.

Supported loops include `tornado.ioloop.IOLoop`, which implements a
slightly different API for `Future` creation (which we try to detect),
and the `asyncio` loop (and any loop that implements `create_future`).

Args:
loop: a reference to an event loop in the current thread.
"""
if getattr(loop, "add_future", None):
# Tornado IOLoop - we must manually create the future
self._create_future = lambda: Future()
elif getattr(loop, "create_future", None):
self._create_future = loop.create_future
else:
raise AttributeError("Loop must implement `create_future` or `add_future`!")
self._loop = loop

def open_table(self, name):
"""Return a proxy Table to a `Table` hosted in a server somewhere."""
return PerspectiveTableProxy(self, name)

def open_view(self, name):
"""Return a proxy View to a `View` hosted in a server somewhere."""
return PerspectiveViewProxy(self, name)

def _handle(self, msg):
"""Given a response from the Perspective server, resolve the Future
with the response or an exception."""
if not msg.get("data"):
return

handler = self._handlers.get(msg["data"].get("id"))

if handler:
future = handler["future"]

keep_alive = handler.get("keep_alive")

if keep_alive and handler.get("callback_id") and future.done():
# Must look up callback function and execute it, and then
# return without re-setting the result of the Future.
callback = self._callback_id_cache.get(handler["callback_id"])
data = msg["data"]["data"]
if data and isinstance(data, dict):
callback(**data)
elif data:
callback(data)
else:
callback()
return

if msg["data"].get("error"):
future.set_exception(PerspectiveError(msg["data"]["error"]))
else:
future.set_result(msg["data"]["data"])

if not keep_alive:
del self._handlers[msg["data"]["id"]]

def send(self, msg):
"""Send the message to the Perspective server implementation - must be
implemented by a child class."""
raise NotImplementedError()

def post(self, msg, future=None, keep_alive=False):
"""Given a message and an associated `Future` object, store the future
and send the message to the server."""
if self._loop is None:
raise PerspectiveError("An event loop must be set on `PerspectiveClient`!")

if future:
handler = {
"future": future,
"keep_alive": keep_alive,
}

if keep_alive and msg.get("callback_id"):
handler["callback_id"] = msg["callback_id"]

self._msg_id += 1
self._handlers[self._msg_id] = handler

msg["id"] = self._msg_id

self.send(msg)

def table(self, data, index=None, limit=None, name=str(random())):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default argument to name here is evaluated once when the method is created, so all proxy tables will have the same name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup - that is a silly mistake on my part

"""Create a new `Table` in the server implementation, and return
a proxy to it."""
return make_table(self, data, index, limit, name)

def terminate():
"""Close the connection between the server and client. Must be
implemented by child classes, although only as part of a public API
as `terminate()` should only be called by the user."""
raise NotImplementedError()
93 changes: 93 additions & 0 deletions python/perspective/perspective/client/dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
################################################################################
#
# Copyright (c) 2019, the Perspective Authors.
#
# This file is part of the Perspective library, distributed under the terms of
# the Apache License 2.0. The full license can be found in the LICENSE file.
#


def async_queue(client, name, method, cmd, *args, **kwargs):
"""Given a method, command, and expected arguments, create a message to
send to the Perspective server implementation and `post` it through
the client."""
arguments = list(args)

if len(kwargs) > 0:
arguments.append(kwargs)

msg = {
"cmd": cmd,
"name": name,
"method": method,
"args": arguments,
"subscribe": False,
}

future = client._create_future()
client.post(msg, future)
return future


def subscribe(client, name, method, cmd, *args, **kwargs):
"""Subscribe to an event that occurs on the Perspective server
implementation, like `on_update`."""
arguments = list(args)
callback = None

for i in range(len(arguments)):
if callable(arguments[i]):
callback = arguments.pop(i)

if len(kwargs) > 0:
arguments.append(kwargs)

# Instead of storing in a global map, store the callbacks on the
# client itself.
client._callback_id += 1
client._callback_cache[callback] = client._callback_id
client._callback_id_cache[client._callback_id] = callback

msg = {
"cmd": cmd,
"name": name,
"method": method,
"args": arguments,
"subscribe": True,
"callback_id": client._callback_id,
}

future = client._create_future()
client.post(msg, future, keep_alive=True)
return future


def unsubscribe(client, name, method, cmd, *args, **kwargs):
"""Unsubscribe from an event that occurs on the Perspective server
implementation, like `remove_update`."""
arguments = list(args)
callback = None

for i in range(len(arguments)):
if callable(arguments[i]):
callback = arguments.pop(i)

if len(kwargs) > 0:
arguments.append(kwargs)

callback_id = client._callback_cache.get(callback)
del client._callback_cache[callback]
del client._callback_id_cache[callback_id]

msg = {
"cmd": cmd,
"name": name,
"method": method,
"args": arguments,
"subscribe": True,
"callback_id": callback_id,
}

future = client._create_future()
client.post(msg, future, keep_alive=True)
return future
Loading