Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronously process updates when running in Tornado #838

Merged
merged 14 commits into from
Dec 11, 2019
26 changes: 13 additions & 13 deletions packages/perspective/src/js/perspective.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ export default function(Module) {
function _set_process(pool, table_id) {
if (!_POOL_DEBOUNCES[table_id]) {
_POOL_DEBOUNCES[table_id] = pool;
setTimeout(() => _clear_process(table_id));
setTimeout(() => _call_process(table_id));
}
}

function _clear_process(table_id) {
function _call_process(table_id) {
const pool = _POOL_DEBOUNCES[table_id];
if (pool) {
pool._process();
_reset_process(table_id);
_remove_process(table_id);
}
}

function _reset_process(table_id) {
function _remove_process(table_id) {
delete _POOL_DEBOUNCES[table_id];
}

Expand Down Expand Up @@ -175,7 +175,7 @@ export default function(Module) {
* @async
*/
view.prototype.delete = function() {
_reset_process(this.table.get_id());
_remove_process(this.table.get_id());
this._View.delete();
this.ctx.delete();

Expand Down Expand Up @@ -288,7 +288,7 @@ export default function(Module) {
* @private
*/
const to_format = function(options, formatter) {
_clear_process(this.table.get_id());
_call_process(this.table.get_id());
options = options || {};
const max_cols = this._View.num_columns() + (this.sides() === 0 ? 0 : 1);
const max_rows = this._View.num_rows();
Expand Down Expand Up @@ -719,7 +719,7 @@ export default function(Module) {
* - "row": The callback is invoked with an Arrow of the updated rows.
*/
view.prototype.on_update = function(callback, {mode = "none"} = {}) {
_clear_process(this.table.get_id());
_call_process(this.table.get_id());
if (["none", "cell", "row"].indexOf(mode) === -1) {
throw new Error(`Invalid update mode "${mode}" - valid modes are "none", "cell" and "row".`);
}
Expand Down Expand Up @@ -773,7 +773,7 @@ export default function(Module) {
}

view.prototype.remove_update = function(callback) {
_clear_process(this.table.get_id());
_call_process(this.table.get_id());
const total = this.callbacks.length;
filterInPlace(this.callbacks, x => x.orig_callback !== callback);
console.assert(total > this.callbacks.length, `"callback" does not match a registered updater`);
Expand Down Expand Up @@ -935,18 +935,18 @@ export default function(Module) {
* the schema and construction options.
*/
table.prototype.clear = function() {
_reset_process(this.get_id());
_remove_process(this.get_id());
this._Table.reset_gnode(this.gnode_id);
};

/**
* Replace all rows in this {@link module:perspective~table} the input data.
*/
table.prototype.replace = function(data) {
_reset_process(this.get_id());
_remove_process(this.get_id());
this._Table.reset_gnode(this.gnode_id);
this.update(data);
_clear_process(this._Table.get_id());
_call_process(this._Table.get_id());
};

/**
Expand All @@ -959,7 +959,7 @@ export default function(Module) {
if (this.views.length > 0) {
throw "Table still has contexts - refusing to delete.";
}
_reset_process(this.get_id());
_remove_process(this.get_id());
this._Table.unregister_gnode(this.gnode_id);
this._Table.delete();
this._delete_callbacks.forEach(callback => callback());
Expand Down Expand Up @@ -1139,7 +1139,7 @@ export default function(Module) {
* supplied configuration, bound to this table
*/
table.prototype.view = function(_config = {}) {
_clear_process(this.get_id());
_call_process(this.get_id());
let config = {};
for (const key of Object.keys(_config)) {
if (defaults.CONFIG_ALIASES[key]) {
Expand Down
71 changes: 52 additions & 19 deletions python/perspective/perspective/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,42 +65,83 @@ def __init__(self):
self._tables = {}
self._views = {}
self._callback_cache = {}
self._queue_process_callback = None

def host(self, data, name=None):
def host(self, item, name=None):
"""Given a :obj:`~perspective.Table` or :obj:`~perspective.View`,
place it under management and allow operations on it to be passed
through the Manager instance.

Args:
table_or_view (:obj:`~perspective.Table`/:obj:`~perspective.View`) :
a Table or View to be managed.

Keyword Args:
name (:obj:`str`) : an optional name to allow retrieval through
`get_table` or `get_view`. A name will be generated if not
provided.
"""
name = name or gen_name()
if isinstance(data, Table):
self._tables[name] = data
elif isinstance(data, View):
self._views[name] = data
if isinstance(item, Table):
self.host_table(name, item)
elif isinstance(item, View):
self.host_view(name, item)
else:
raise PerspectiveError(
"Only `Table()` and `View()` instances can be hosted.")

def host_table(self, name, table):
'''Given a reference to a `Table`, manage it and allow operations on it
to occur through the Manager.

If a function for `queue_process` is defined (i.e., by
:obj:`~perspective.PerspectiveTornadoHandler`), bind the function to
`Table` and have it call the manager's version of `queue_process`.
'''
name = name or gen_name()
if self._queue_process_callback is not None:
# always bind the callback to the table's state manager
table._state_manager.queue_process = partial(
self._queue_process_callback, state_manager=table._state_manager)
self._tables[name] = table
return name

def host_view(self, name, view):
'''Given a reference to a `View`, add it to the manager's views
'''Given a :obj:`~perspective.View`, add it to the manager's views
container.
'''
self._views[name] = view

def get_table(self, name):
'''Return a table under management by name.'''
return self._tables.get(name, None)

def get_view(self, name):
'''Return a view under management by name.'''
return self._views.get(name, None)

def new_session(self):
return PerspectiveSession(self)

def _set_queue_process(self, func):
"""For each table under management, bind `func` to the table's state
manager and to run whenever `queue_process` is called.

After this method is called, future Tables hosted on this manager
instance will call the same `queue_process` callback.
"""
self._queue_process_callback = func
for table in self._tables.values():
table._state_manager.queue_process = partial(
self._queue_process_callback, state_manager=table._state_manager)

def _process(self, msg, post_callback, client_id=None):
'''Given a message from the client, process it through the Perspective
engine.

Args:
msg (dict): a message from the client with instructions that map to
engine operations post_callback (callable): a function that
returns data to the client
msg (:obj`dict`): a message from the client with instructions
that map to engine operations
post_callback (:obj`callable`): a function that returns data to the client
'''
if isinstance(msg, str):
if msg == "heartbeat": # TODO fix this
Expand Down Expand Up @@ -172,7 +213,7 @@ def _process_method_call(self, msg, post_callback):
# make sure schema returns string types
args["as_string"] = True
elif msg["method"].startswith("to_"):
# TODO
# parse options in `to_format` calls
for d in msg.get("args", []):
args.update(d)
else:
Expand Down Expand Up @@ -262,11 +303,3 @@ def _make_error_message(self, id, error):
"id": id,
"error": error
}

def get_table(self, name):
'''Return a table under management by name.'''
return self._tables.get(name, None)

def get_view(self, name):
'''Return a view under management by name.'''
return self._views.get(name, None)
3 changes: 1 addition & 2 deletions python/perspective/perspective/src/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ std::shared_ptr<Table> make_table_py(t_val table, t_data_accessor accessor, t_va
// calculate offset, limit, and set the gnode
tbl->init(data_table, row_count, op);

// FIXME: replicate JS _clear_process etc.
pool->_process();
//pool->_process();
return tbl;
}

Expand Down
1 change: 1 addition & 0 deletions python/perspective/perspective/table/_data_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def _mod(a, b):


def to_format(options, view, output_format):
view._table._state_manager.call_process(view._table._table.get_id())
options, column_names, data_slice = _to_format_helper(view, options)

if output_format == 'records':
Expand Down
92 changes: 92 additions & 0 deletions python/perspective/perspective/table/_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# *****************************************************************************
#
# Copyright (c) 2019, the Perspective Authors.
#
# This file is part of the Perspective library, distributed under the terms of
# the Apache License 2.0. The full license can be found in the LICENSE file.
#


class _PerspectiveStateManager(object):
"""Internal state management class that controls when `_process` is called
within the C++ Table internals.

`_process()` notifies the engine to clear its queue of pending updates to be
applied and reconciled. When Perspective runs within an event loop, we
should use the loop whenever possible to batch calls to `_process()`.
Callers that have access to an event loop implementation should set
`queue_process` to their own function with `table_id` and `state_manager` as
positional arguments.

Override functions must be bound to this instance using `functools.partial`,
i.e.: `functools.partial(queue_process_custom,
state_manager=table._state_manager)

The guarantee of `queue_process` is that `call_process` will be called,
either on the next iteration of the event loop or before output is generated
(through a serialization method, for example).

Though each :obj:`~perspective.Table` contains a separate instance of the
state manager, `TO_PROCESS`, which contains the `t_pool` objects for pending
`_process` calls, is shared amongst all instances of the state manager.
"""
TO_PROCESS = {}

def __init__(self):
"""Create a new instance of the state manager, and enable the default behavior
of calling `_process()` synchronously.

New instances of :obj:`_PerspectiveStateManager` have no awareness of whether
an event loop is present - the instance's `queue_process` method must be
overridden.
"""
self.queue_process = self._queue_process_immediate

def set_process(self, pool, table_id):
"""Queue a `_process` call on the specified pool and table ID.

Checks whether a `_process()` call has been queued already for the specified
table, and calls `queue_process`, which MUST be implemented by the caller.

Args:
pool (:obj`libbinding.t_pool`): a `t_pool` object
table_id (:obj`int`): a unique ID for the Table
"""
if table_id not in _PerspectiveStateManager.TO_PROCESS:
_PerspectiveStateManager.TO_PROCESS[table_id] = pool
self.queue_process(table_id)

def call_process(self, table_id):
"""Given a table_id, find the corresponding pool and call `process()`
on it, which takes all the updates that have been queued, applies each
update to the global Table state, and then clears the queue.

Args:
table_id (:obj`int`): The unique ID of the Table
"""
pool = _PerspectiveStateManager.TO_PROCESS.get(table_id, None)
if pool is not None:
pool._process()
self.remove_process(table_id)

def remove_process(self, table_id):
"""Remove a pool from the execution cache, indicating that it should no
longer be operated on.

Args:
table_id (:obj`int`): The unique ID of the Table
"""
_PerspectiveStateManager.TO_PROCESS.pop(table_id, None)

def _queue_process_immediate(self, table_id):
"""Immediately execute `call_process` on the pool as soon
as `queue_process` is called.

This is the default implementation of `queue_process` for environments
without an event loop, meaning that calls to :obj:`~perspective.Table`'s
`update()` method are immediately followed by a call to `_process`.

Args:
table_id (:obj`int`): The unique ID of the Table
"""
self.call_process(table_id)
Loading