diff --git a/cpp/perspective/CMakeLists.txt b/cpp/perspective/CMakeLists.txt index 563089d3c5..19e74d1b47 100644 --- a/cpp/perspective/CMakeLists.txt +++ b/cpp/perspective/CMakeLists.txt @@ -514,6 +514,7 @@ set (SOURCE_FILES ${PSP_CPP_SRC}/src/cpp/pool.cpp ${PSP_CPP_SRC}/src/cpp/port.cpp ${PSP_CPP_SRC}/src/cpp/process_state.cpp + ${PSP_CPP_SRC}/src/cpp/pyutils.cpp ${PSP_CPP_SRC}/src/cpp/raii.cpp ${PSP_CPP_SRC}/src/cpp/raii_impl_linux.cpp ${PSP_CPP_SRC}/src/cpp/raii_impl_osx.cpp diff --git a/cpp/perspective/src/cpp/gnode.cpp b/cpp/perspective/src/cpp/gnode.cpp index 94d02eb77a..52d29087b5 100644 --- a/cpp/perspective/src/cpp/gnode.cpp +++ b/cpp/perspective/src/cpp/gnode.cpp @@ -20,6 +20,10 @@ #include #include +#ifdef PSP_ENABLE_PYTHON +#include +#endif + namespace perspective { t_tscalar @@ -250,7 +254,7 @@ t_gnode::_process_table(t_uindex port_id) { return result; } - std::shared_ptr input_port = m_input_ports[port_id]; + std::shared_ptr& input_port = m_input_ports[port_id]; if (input_port->get_table()->size() == 0) { return result; @@ -288,7 +292,7 @@ t_gnode::_process_table(t_uindex port_id) { // contexts obliquely read gnode state at various points. _update_contexts_from_state(flattened); - release_inputs(); + input_port->release(); release_outputs(); #ifdef PSP_GNODE_VERIFY @@ -593,6 +597,9 @@ bool t_gnode::process(t_uindex port_id) { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "Cannot `process` on an uninited gnode."); +#ifdef PSP_ENABLE_PYTHON + PerspectiveScopedGILRelease acquire(m_event_loop_thread_id); +#endif t_process_table_result result = _process_table(port_id); @@ -1347,6 +1354,13 @@ t_gnode::repr() const { return ss.str(); } +#ifdef PSP_ENABLE_PYTHON +void +t_gnode::set_event_loop_thread_id(std::thread::id id) { + m_event_loop_thread_id = id; +} +#endif + void t_gnode::register_context(const std::string& name, std::shared_ptr ctx) { _register_context(name, ZERO_SIDED_CONTEXT, reinterpret_cast(ctx.get())); diff --git a/cpp/perspective/src/cpp/pool.cpp b/cpp/perspective/src/cpp/pool.cpp index eb0947edf7..790f7a9790 100644 --- a/cpp/perspective/src/cpp/pool.cpp +++ b/cpp/perspective/src/cpp/pool.cpp @@ -13,7 +13,9 @@ #include #include #include +#ifdef PSP_ENABLE_PYTHON #include +#endif #include namespace perspective { @@ -48,6 +50,7 @@ empty_callback() { t_pool::t_pool() : m_update_delegate(empty_callback()) + , m_event_loop_thread_id(std::thread::id()) , m_sleep(0) { m_run.clear(); } @@ -83,6 +86,11 @@ t_pool::register_gnode(t_gnode* node) { t_uindex id = m_gnodes.size() - 1; node->set_id(id); node->set_pool_cleanup([this, id]() { this->m_gnodes[id] = 0; }); +#ifdef PSP_ENABLE_PYTHON + if (m_event_loop_thread_id != std::thread::id()) { + node->set_event_loop_thread_id(m_event_loop_thread_id); + } +#endif if (t_env::log_progress()) { std::cout << "t_pool.register_gnode node => " << node << " rv => " << id << std::endl; @@ -124,8 +132,21 @@ t_pool::send(t_uindex gnode_id, t_uindex port_id, const t_data_table& table) { } } +#ifdef PSP_ENABLE_PYTHON +void t_pool::set_event_loop() { + m_event_loop_thread_id = std::this_thread::get_id(); + for (auto node : m_gnodes) { + node->set_event_loop_thread_id(m_event_loop_thread_id); + } +} + +std::thread::id t_pool::get_event_loop_thread_id() const { + return m_event_loop_thread_id; +} +#endif + void -t_pool::_process_helper() { +t_pool::_process() { auto work_to_do = m_data_remaining.load(); if (work_to_do) { t_update_task task(*this); @@ -133,18 +154,10 @@ t_pool::_process_helper() { } } -void -t_pool::_process() { - t_uindex sleep_time = m_sleep.load(); - _process_helper(); - std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); -} - void t_pool::stop() { m_run.clear(std::memory_order_release); - _process_helper(); - + _process(); if (t_env::log_progress()) { std::cout << "t_pool.stop" << std::endl; } diff --git a/cpp/perspective/src/cpp/pyutils.cpp b/cpp/perspective/src/cpp/pyutils.cpp index fb34f9d838..9c8bc7cdec 100644 --- a/cpp/perspective/src/cpp/pyutils.cpp +++ b/cpp/perspective/src/cpp/pyutils.cpp @@ -8,39 +8,28 @@ */ #include +#include #include -#include -#include -#include - +#ifdef PSP_ENABLE_PYTHON namespace perspective { -bool -curthread_has_gil() { - auto tstate = _PyThreadState_Current; - return tstate && (tstate == PyGILState_GetThisThreadState()); -} - -void -print_python_stack() { - auto tstate = PyThreadState_GET(); - if (0 != tstate && 0 != tstate->frame) { - auto f = tstate->frame; - std::cout << "Python stack trace:" << std::endl; - while (0 != f) { - auto line = PyCode_Addr2Line(f->f_code, f->f_lasti); - auto filename = PyString_AsString(f->f_code->co_filename); - auto funcname = PyString_AsString(f->f_code->co_name); - std::cout << "\t" << filename << ":" << line << " : " << funcname << std::endl; - f = f->f_back; +PerspectiveScopedGILRelease::PerspectiveScopedGILRelease(std::thread::id event_loop_thread_id) : m_thread_state(NULL) { + if (event_loop_thread_id != std::thread::id()) { + if (std::this_thread::get_id() != event_loop_thread_id) { + std::stringstream err; + err << "Perspective called from wrong thread; Expected " << event_loop_thread_id << "; Got " << std::this_thread::get_id() << std::endl; + PSP_COMPLAIN_AND_ABORT(err.str()); } + m_thread_state = PyEval_SaveThread(); } } -std::string -repr(PyObject* pyo) { - PyObjectPtr repr(PyObject_Repr(pyo)); - return std::string(PyString_AsString(repr.get())); +PerspectiveScopedGILRelease::~PerspectiveScopedGILRelease() { + if (m_thread_state != NULL) { + PyEval_RestoreThread(m_thread_state); + } } -} // end namespace perspective \ No newline at end of file +} // end namespace perspective + +#endif \ No newline at end of file diff --git a/cpp/perspective/src/cpp/view.cpp b/cpp/perspective/src/cpp/view.cpp index 1ed27d56c3..449ce8fb9d 100644 --- a/cpp/perspective/src/cpp/view.cpp +++ b/cpp/perspective/src/cpp/view.cpp @@ -770,6 +770,14 @@ View::is_column_only() const { return m_view_config->is_column_only(); } +#ifdef PSP_ENABLE_PYTHON +template +std::thread::id +View::get_event_loop_thread_id() const { + return m_table->get_pool()->get_event_loop_thread_id(); +}; +#endif + /****************************************************************************** * * Private diff --git a/cpp/perspective/src/include/perspective/gnode.h b/cpp/perspective/src/include/perspective/gnode.h index bbde6127d5..3ffe6a55d2 100644 --- a/cpp/perspective/src/include/perspective/gnode.h +++ b/cpp/perspective/src/include/perspective/gnode.h @@ -25,6 +25,9 @@ #include #include #include +#ifdef PSP_ENABLE_PYTHON +#include +#endif #ifdef PSP_PARALLEL_FOR #include #include @@ -190,6 +193,10 @@ class PERSPECTIVE_EXPORT t_gnode { void pprint() const; std::string repr() const; +#ifdef PSP_ENABLE_PYTHON + void set_event_loop_thread_id(std::thread::id id); +#endif + protected: /** * @brief Given `tbl`, notify each registered context with `tbl`. @@ -390,6 +397,10 @@ class PERSPECTIVE_EXPORT t_gnode { std::vector m_custom_columns; std::function m_pool_cleanup; bool m_was_updated; + +#ifdef PSP_ENABLE_PYTHON + std::thread::id m_event_loop_thread_id; +#endif }; /** diff --git a/cpp/perspective/src/include/perspective/pool.h b/cpp/perspective/src/include/perspective/pool.h index 3358d46a89..275dec5298 100644 --- a/cpp/perspective/src/include/perspective/pool.h +++ b/cpp/perspective/src/include/perspective/pool.h @@ -15,6 +15,10 @@ #include #include +#ifdef PSP_ENABLE_PYTHON +#include +#endif + #if defined PSP_ENABLE_WASM #include typedef emscripten::val t_val; @@ -57,6 +61,11 @@ class PERSPECTIVE_EXPORT t_pool { t_uindex gnode_id, const std::string& name, t_ctx_type type, std::int64_t ptr); #endif +#ifdef PSP_ENABLE_PYTHON + void set_event_loop(); + std::thread::id get_event_loop_thread_id() const; +#endif + /** * @brief Call the binding language's `update_callback` method, * set at initialize time. @@ -74,7 +83,7 @@ class PERSPECTIVE_EXPORT t_pool { void send(t_uindex gnode_id, t_uindex port_id, const t_data_table& table); void _process(); - void _process_helper(); + void init(); void stop(); void set_sleep(t_uindex ms); @@ -102,6 +111,9 @@ class PERSPECTIVE_EXPORT t_pool { bool validate_gnode_id(t_uindex gnode_id) const; private: +#ifdef PSP_ENABLE_PYTHON + std::thread::id m_event_loop_thread_id; +#endif std::mutex m_mtx; std::vector m_gnodes; diff --git a/cpp/perspective/src/include/perspective/pyutils.h b/cpp/perspective/src/include/perspective/pyutils.h new file mode 100644 index 0000000000..0c9bf8431b --- /dev/null +++ b/cpp/perspective/src/include/perspective/pyutils.h @@ -0,0 +1,30 @@ +/****************************************************************************** + * + * Copyright (c) 2017, the Perspective Authors. + * + * This file is part of the Perspective library, distributed under the terms of + * the Apache License 2.0. The full license can be found in the LICENSE file. + * + */ + +#pragma once +#include + +#ifdef PSP_ENABLE_PYTHON +#include + +namespace perspective { + +class PerspectiveScopedGILRelease { + public: + PerspectiveScopedGILRelease(std::thread::id event_loop_thread_id); + ~PerspectiveScopedGILRelease(); + private: + PyThreadState* m_thread_state; +}; + + +void _set_event_loop(); + +} // namespace perspective +#endif \ No newline at end of file diff --git a/cpp/perspective/src/include/perspective/view.h b/cpp/perspective/src/include/perspective/view.h index 04046453af..25d9410573 100644 --- a/cpp/perspective/src/include/perspective/view.h +++ b/cpp/perspective/src/include/perspective/view.h @@ -21,6 +21,9 @@ #include #include #include +#ifdef PSP_ENABLE_PYTHON +#include +#endif namespace perspective { @@ -224,6 +227,9 @@ class PERSPECTIVE_EXPORT View { t_stepdelta get_step_delta(t_index bidx, t_index eidx) const; t_dtype get_column_dtype(t_uindex idx) const; bool is_column_only() const; +#ifdef PSP_ENABLE_PYTHON + std::thread::id get_event_loop_thread_id() const; +#endif private: /** diff --git a/python/perspective/perspective/include/perspective/python.h b/python/perspective/perspective/include/perspective/python.h index 9e1c15cc05..beeb61ce90 100644 --- a/python/perspective/perspective/include/perspective/python.h +++ b/python/perspective/perspective/include/perspective/python.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -255,6 +256,7 @@ PYBIND11_MODULE(libbinding, m) .def(py::init<>()) .def("set_update_delegate", &t_pool::set_update_delegate) .def("unregister_gnode", &t_pool::unregister_gnode) + .def("set_event_loop", &t_pool::set_event_loop) .def("_process", &t_pool::_process); /****************************************************************************** @@ -384,7 +386,6 @@ PYBIND11_MODULE(libbinding, m) m.def("make_computations", &make_computations); m.def("scalar_to_py", &scalar_to_py); m.def("_set_nthreads", &_set_nthreads); - m.def("_set_event_loop", &_set_event_loop); } #endif diff --git a/python/perspective/perspective/include/perspective/python/serialization.h b/python/perspective/perspective/include/perspective/python/serialization.h index d4162ecdd4..baf54466dd 100644 --- a/python/perspective/perspective/include/perspective/python/serialization.h +++ b/python/perspective/perspective/include/perspective/python/serialization.h @@ -11,6 +11,7 @@ #include #include +#include #include #include diff --git a/python/perspective/perspective/include/perspective/python/table.h b/python/perspective/perspective/include/perspective/python/table.h index ac365690bc..f4224616fd 100644 --- a/python/perspective/perspective/include/perspective/python/table.h +++ b/python/perspective/perspective/include/perspective/python/table.h @@ -11,6 +11,7 @@ #include #include +#include #include namespace perspective { diff --git a/python/perspective/perspective/include/perspective/python/utils.h b/python/perspective/perspective/include/perspective/python/utils.h index c1ff6b346b..8bbd65cb94 100644 --- a/python/perspective/perspective/include/perspective/python/utils.h +++ b/python/perspective/perspective/include/perspective/python/utils.h @@ -19,16 +19,6 @@ namespace perspective { namespace binding { -class PerspectiveScopedGILRelease { - public: - PerspectiveScopedGILRelease(); - ~PerspectiveScopedGILRelease(); - private: - PyThreadState* thread_state; -}; - - -void _set_event_loop(); void _set_nthreads(int nthreads); diff --git a/python/perspective/perspective/include/perspective/python/view.h b/python/perspective/perspective/include/perspective/python/view.h index be7c33b7cb..e06dfb2eac 100644 --- a/python/perspective/perspective/include/perspective/python/view.h +++ b/python/perspective/perspective/include/perspective/python/view.h @@ -11,6 +11,7 @@ #include #include +#include #include #include diff --git a/python/perspective/perspective/manager/manager.py b/python/perspective/perspective/manager/manager.py index b36ae9ecf5..e9b2e44918 100644 --- a/python/perspective/perspective/manager/manager.py +++ b/python/perspective/perspective/manager/manager.py @@ -11,7 +11,6 @@ from functools import partial from ..core.exception import PerspectiveError from ..table import Table -from ..table.libbinding import _set_event_loop from ..table.view import View from .session import PerspectiveSession from .manager_internal import _PerspectiveManagerInternal @@ -97,6 +96,7 @@ def host_table(self, name, table): """ if self._loop_callback is not None: # always bind the callback to the table's state manager + self._loop_callback(lambda: table._table.get_pool().set_event_loop()) table._state_manager.queue_process = partial( self._loop_callback, table._state_manager.call_process ) @@ -132,9 +132,9 @@ def set_loop_callback(self, loop_callback): and schedules it to run on the same thread on which `set_loop_callback()` was originally invoked. """ - loop_callback(_set_event_loop) self._loop_callback = loop_callback for table in self._tables.values(): + loop_callback(lambda: table._table.get_pool().set_event_loop()) table._state_manager.queue_process = partial( loop_callback, table._state_manager.call_process ) diff --git a/python/perspective/perspective/manager/manager_internal.py b/python/perspective/perspective/manager/manager_internal.py index 0140cc7378..0ab6be08a9 100644 --- a/python/perspective/perspective/manager/manager_internal.py +++ b/python/perspective/perspective/manager/manager_internal.py @@ -315,7 +315,10 @@ def clear_views(self, client_id): for name, view in self._views.items(): if view._client_id == client_id: - view.delete() + if self._loop_callback: + self._loop_callback(view.delete) + else: + view.delete() names.append(name) count += 1 diff --git a/python/perspective/perspective/src/serialization.cpp b/python/perspective/perspective/src/serialization.cpp index 7f59151ba6..0d55c0d410 100644 --- a/python/perspective/perspective/src/serialization.cpp +++ b/python/perspective/perspective/src/serialization.cpp @@ -25,7 +25,7 @@ template std::shared_ptr> get_data_slice(std::shared_ptr> view, std::uint32_t start_row, std::uint32_t end_row, std::uint32_t start_col, std::uint32_t end_col) { - PerspectiveScopedGILRelease acquire; + PerspectiveScopedGILRelease acquire(view->get_event_loop_thread_id()); auto data_slice = view->get_data(start_row, end_row, start_col, end_col); return data_slice; } diff --git a/python/perspective/perspective/src/table.cpp b/python/perspective/perspective/src/table.cpp index 92dc13fc07..a8fe5bd0d0 100644 --- a/python/perspective/perspective/src/table.cpp +++ b/python/perspective/perspective/src/table.cpp @@ -42,6 +42,8 @@ std::shared_ptr make_table_py(t_val table, t_data_accessor accessor, gnode = tbl->get_gnode(); offset = tbl->get_offset(); is_update = (is_update || gnode->mapping_size() > 0); + } else { + pool = std::make_shared(); } std::vector column_names; @@ -60,7 +62,7 @@ std::shared_ptr
make_table_py(t_val table, t_data_accessor accessor, void * ptr = malloc(size); std::memcpy(ptr, bytes.cast().c_str(), size); { - PerspectiveScopedGILRelease acquire; + PerspectiveScopedGILRelease acquire(pool->get_event_loop_thread_id()); arrow_loader.initialize((uintptr_t)ptr, size); @@ -155,7 +157,6 @@ std::shared_ptr
make_table_py(t_val table, t_data_accessor accessor, } if (!table_initialized) { - pool = std::make_shared(); tbl = std::make_shared
(pool, column_names, data_types, limit, index); offset = 0; } @@ -178,7 +179,7 @@ std::shared_ptr
make_table_py(t_val table, t_data_accessor accessor, data_table.init(); std::uint32_t row_count; if (is_arrow) { - PerspectiveScopedGILRelease acquire; + PerspectiveScopedGILRelease acquire(pool->get_event_loop_thread_id()); row_count = arrow_loader.row_count(); data_table.extend(arrow_loader.row_count()); arrow_loader.fill_table(data_table, input_schema, index, offset, limit, is_update); diff --git a/python/perspective/perspective/src/utils.cpp b/python/perspective/perspective/src/utils.cpp index 613900ce02..1111a81fed 100644 --- a/python/perspective/perspective/src/utils.cpp +++ b/python/perspective/perspective/src/utils.cpp @@ -22,30 +22,6 @@ namespace perspective { namespace binding { -std::thread::id* event_loop_thread; - -PerspectiveScopedGILRelease::PerspectiveScopedGILRelease() : thread_state(0) { - if (event_loop_thread != NULL) { - if (std::this_thread::get_id() != *event_loop_thread) { - std::cerr << "Perspective called from wrong thread; Expected " << *event_loop_thread << "; Got " << std::this_thread::get_id() << std::endl; - } - thread_state = PyEval_SaveThread(); - } -} - -PerspectiveScopedGILRelease::~PerspectiveScopedGILRelease() { - if (event_loop_thread != NULL) { - if (std::this_thread::get_id() != *event_loop_thread) { - std::cerr << "Perspective called from wrong thread; Expected " << *event_loop_thread << "; Got " << std::this_thread::get_id() << std::endl; - } - PyEval_RestoreThread(thread_state); - } -} - -void _set_event_loop() { - event_loop_thread = new std::thread::id(std::this_thread::get_id()); -} - std::shared_ptr control = std::make_shared( tbb::global_control::max_allowed_parallelism, diff --git a/python/perspective/perspective/src/view.cpp b/python/perspective/perspective/src/view.cpp index 09ce90e243..cc00f323d7 100644 --- a/python/perspective/perspective/src/view.cpp +++ b/python/perspective/perspective/src/view.cpp @@ -243,7 +243,7 @@ make_view(std::shared_ptr
table, const std::string& name, const std::stri std::shared_ptr schema = std::make_shared(table->get_schema()); std::shared_ptr config = make_view_config(schema, date_parser, view_config); { - PerspectiveScopedGILRelease acquire; + PerspectiveScopedGILRelease acquire(table->get_pool()->get_event_loop_thread_id()); auto ctx = make_context(table, schema, config, name); auto view_ptr = std::make_shared>(table, ctx, name, separator, config); return view_ptr; @@ -276,7 +276,7 @@ to_arrow_zero( std::int32_t start_col, std::int32_t end_col ) { - PerspectiveScopedGILRelease acquire; + PerspectiveScopedGILRelease acquire(view->get_event_loop_thread_id()); std::shared_ptr str = view->to_arrow(start_row, end_row, start_col, end_col); return py::bytes(*str); @@ -290,7 +290,7 @@ to_arrow_one( std::int32_t start_col, std::int32_t end_col ) { - PerspectiveScopedGILRelease acquire; + PerspectiveScopedGILRelease acquire(view->get_event_loop_thread_id()); std::shared_ptr str = view->to_arrow(start_row, end_row, start_col, end_col); return py::bytes(*str); @@ -304,7 +304,7 @@ to_arrow_two( std::int32_t start_col, std::int32_t end_col ) { - PerspectiveScopedGILRelease acquire; + PerspectiveScopedGILRelease acquire(view->get_event_loop_thread_id()); std::shared_ptr str = view->to_arrow(start_row, end_row, start_col, end_col); return py::bytes(*str); @@ -312,7 +312,7 @@ to_arrow_two( py::bytes get_row_delta_zero(std::shared_ptr> view) { - PerspectiveScopedGILRelease acquire; + PerspectiveScopedGILRelease acquire(view->get_event_loop_thread_id()); std::shared_ptr> slice = view->get_row_delta(); std::shared_ptr arrow = view->data_slice_to_arrow(slice); return py::bytes(*arrow); @@ -320,7 +320,7 @@ get_row_delta_zero(std::shared_ptr> view) { py::bytes get_row_delta_one(std::shared_ptr> view) { - PerspectiveScopedGILRelease acquire; + PerspectiveScopedGILRelease acquire(view->get_event_loop_thread_id()); std::shared_ptr> slice = view->get_row_delta(); std::shared_ptr arrow = view->data_slice_to_arrow(slice); return py::bytes(*arrow); @@ -329,7 +329,7 @@ get_row_delta_one(std::shared_ptr> view) { py::bytes get_row_delta_two( std::shared_ptr> view) { - PerspectiveScopedGILRelease acquire; + PerspectiveScopedGILRelease acquire(view->get_event_loop_thread_id()); std::shared_ptr> slice = view->get_row_delta(); std::shared_ptr arrow = view->data_slice_to_arrow(slice); return py::bytes(*arrow); diff --git a/python/perspective/perspective/tests/core/test_async.py b/python/perspective/perspective/tests/core/test_async.py index dc9d7f439f..dcb1d5f1d1 100644 --- a/python/perspective/perspective/tests/core/test_async.py +++ b/python/perspective/perspective/tests/core/test_async.py @@ -5,57 +5,44 @@ # This file is part of the Perspective library, distributed under the terms of # the Apache License 2.0. The full license can be found in the LICENSE file. # + import random import six import tornado.ioloop +import queue +import threading + from functools import partial -from threading import Thread -from pytest import mark from perspective import Table, PerspectiveManager -from perspective.table._state import _PerspectiveStateManager - - -class AsyncSentinel(object): - """Check how many times the callback is actually being called - on the IOLoop.""" - - def __init__(self, value): - self.value = value - - def set(self, new): - self.value = new - def get(self): - return self.value +def syncify(f): + """Given a function `f` that must be run on `TestAsync.loop`, queue `f` on + the loop, block until it is evaluated, and return the result. + """ + sem = queue.Queue() -SENTINEL = AsyncSentinel(0) + def _syncify_task(): + assert threading.current_thread().ident == TestAsync.thread.ident + result = f() + TestAsync.loop.add_callback(lambda: sem.put(result)) + def _syncify(): + TestAsync.loop.add_callback(_syncify_task) + return sem.get() -def queue_process_async(loop, f, *args, **kwargs): - """Create our own `queue_process` method that uses a Tornado IOLoop.""" - SENTINEL.set(SENTINEL.get() + 1) - loop.add_callback(f, *args, **kwargs) - - -def queue_process_async_delay(loop, delay, f, *args, **kwargs): - """Create our own `queue_process` method that uses a Tornado IOLoop.""" - SENTINEL.set(SENTINEL.get() + 1) - loop.call_later(delay, f, *args, **kwargs) + return _syncify data = [{"a": i, "b": i * 0.5, "c": str(i)} for i in range(10)] -# @mark.skipif(six.PY2, reason="Requires Python 3") class TestAsync(object): - @classmethod def setup_class(cls): cls.loop = tornado.ioloop.IOLoop() cls.loop.make_current() - cls.wrapped_queue_process = partial(queue_process_async, cls.loop) - cls.thread = Thread(target=cls.loop.start) + cls.thread = threading.Thread(target=cls.loop.start) cls.thread.daemon = True cls.thread.start() @@ -73,371 +60,268 @@ def loop_is_running(cls): else: return cls.loop.asyncio_loop.is_running() - def setup_method(self): - global SENTINEL - SENTINEL = AsyncSentinel(0) - - def teardown_method(self): - global SENTINEL - SENTINEL = AsyncSentinel(0) - def test_async_queue_process(self): - tbl = Table({ - "a": int, - "b": float, - "c": str - }) + tbl = Table({"a": int, "b": float, "c": str}) manager = PerspectiveManager() - manager.set_loop_callback(TestAsync.wrapped_queue_process) + manager.set_loop_callback(TestAsync.loop.add_callback) manager.host(tbl) - assert tbl.size() == 0 - - for i in range(5): - tbl.update([data[i]]) - - # process should have been called at least once - assert SENTINEL.get() > 0 + @syncify + def _task(): + assert tbl.size() == 0 + for i in range(5): + tbl.update([data[i]]) + return tbl.size() + assert _task() == 5 tbl.delete() def test_async_multiple_managers_queue_process(self): - tbl = Table({ - "a": int, - "b": float, - "c": str - }) - tbl2 = Table({ - "a": int, - "b": float, - "c": str - }) + tbl = Table({"a": int, "b": float, "c": str}) + tbl2 = Table({"a": int, "b": float, "c": str}) + manager = PerspectiveManager() manager2 = PerspectiveManager() manager.host_table("tbl", tbl) manager2.host_table("tbl2", tbl2) - manager.set_loop_callback(TestAsync.wrapped_queue_process) - manager2.set_loop_callback(TestAsync.wrapped_queue_process) + manager.set_loop_callback(TestAsync.loop.add_callback) + manager2.set_loop_callback(TestAsync.loop.add_callback) - for i in range(5): - tbl.update([data[i]]) - tbl2.update([data[i]]) + @syncify + def _update_task(): + for i in range(5): + tbl.update([data[i]]) + tbl2.update([data[i]]) + return tbl.size() - assert SENTINEL.get() != 0 + assert _update_task() == 5 - # flush `TO_PROCESS` - view = tbl.view() - assert view.to_records() == data[:5] + @syncify + def _flush_to_process(): + view = tbl2.view() + records = view.to_records() + for i in range(5): + tbl2.update([data[i]]) - for i in range(5): - tbl2.update([data[i]]) + view.delete() + return records - view.delete() - tbl2.delete() - tbl.delete() + assert _flush_to_process() == data[:5] + + @syncify + def _delete_task(): + tbl2.delete() + tbl.delete() + + _delete_task() def test_async_multiple_managers_mixed_queue_process(self): - # mutate when synchronously calling queue_process for each update - SENTINEL_2 = AsyncSentinel(0) + sentinel = {"called": 0} def sync_queue_process(f, *args, **kwargs): - SENTINEL_2.set(SENTINEL_2.get() - 1) + sentinel["called"] += 1 f(*args, **kwargs) - tbl = Table({ - "a": int, - "b": float, - "c": str - }) - tbl2 = Table({ - "a": int, - "b": float, - "c": str - }) + tbl = Table({"a": int, "b": float, "c": str}) + tbl2 = Table({"a": int, "b": float, "c": str}) manager = PerspectiveManager() manager2 = PerspectiveManager() - manager.host_table("tbl", tbl) manager2.host_table("tbl2", tbl2) # manager uses tornado, manager2 is synchronous - manager.set_loop_callback(TestAsync.wrapped_queue_process) + manager.set_loop_callback(TestAsync.loop.add_callback) manager2.set_loop_callback(sync_queue_process) - tbl_id = tbl._table.get_id() - tbl2_id = tbl2._table.get_id() + @syncify + def _tbl_task(): + for i in range(5): + tbl.update([data[i]]) + return tbl.size() + + assert _tbl_task() == 5 for i in range(5): - tbl.update([data[i]]) tbl2.update([data[i]]) - assert SENTINEL.get() != 0 - assert SENTINEL_2.get() == -6 + assert sentinel["called"] == 6 + + @syncify + def _tbl_task2(): + view = tbl.view() + records = view.to_records() + view.delete() + tbl.delete() + return records - assert tbl2_id not in _PerspectiveStateManager.TO_PROCESS + assert _tbl_task2() == data[:5] - # flush `TO_PROCESS` - view = tbl.view() + view = tbl2.view() assert view.to_records() == data[:5] - assert tbl_id not in _PerspectiveStateManager.TO_PROCESS - tbl2.delete() view.delete() - tbl.delete() + tbl2.delete() def test_async_multiple_managers_delayed_process(self): - from time import sleep - short_delay_queue_process = partial(queue_process_async_delay, TestAsync.loop, 0.5) - long_delay_queue_process = partial(queue_process_async_delay, TestAsync.loop, 1) - - tbl = Table({ - "a": int, - "b": float, - "c": str - }) - tbl2 = Table({ - "a": int, - "b": float, - "c": str - }) + sentinel = {"async": 0, "sync": 0} + + def _counter(key, f, *args, **kwargs): + sentinel[key] += 1 + return f(*args, **kwargs) + + short_delay_queue_process = partial(_counter, "sync") + long_delay_queue_process = partial( + TestAsync.loop.add_timeout, 1, _counter, "async" + ) + + tbl = Table({"a": int, "b": float, "c": str}) + tbl2 = Table({"a": int, "b": float, "c": str}) + manager = PerspectiveManager() manager2 = PerspectiveManager() - manager.host_table("tbl", tbl) manager2.host_table("tbl2", tbl2) - # The guarantee of `queue_process` is that eventually `_process` - # will be called, either by user action or loop iteration. By adding - # the delay, we can artificially queue up actions for later execution - # and see that it's working properly. manager.set_loop_callback(short_delay_queue_process) manager2.set_loop_callback(long_delay_queue_process) - tbl_id = tbl._table.get_id() - tbl2_id = tbl2._table.get_id() + @syncify + def _tbl_task(): + for i in range(10): + tbl2.update([data[i]]) + _tbl_task() for i in range(10): tbl.update([data[i]]) - tbl2.update([data[i]]) - - assert SENTINEL.get() != 0 - # updates are now queued - assert tbl_id in _PerspectiveStateManager.TO_PROCESS - assert tbl2_id in _PerspectiveStateManager.TO_PROCESS + @syncify + def _tbl_task2(): + size = tbl2.size() + tbl2.delete() + return size - # Wait for the callbacks to run - we don't call any methods - # that would call `call_process`, but instead wait for the - # callbacks to execute asynchronously. - sleep(1) + assert _tbl_task2() == 10 + assert tbl.size() == 10 + assert sentinel["async"] == 2 + assert sentinel["sync"] == 11 - tbl2.delete() tbl.delete() def test_async_queue_process_multiple_ports(self): - tbl = Table({ - "a": int, - "b": float, - "c": str - }) - + tbl = Table({"a": int, "b": float, "c": str}) port_ids = [0] - port_data = [{ - "a": 0, - "b": 0, - "c": "0" - }] + port_data = [{"a": 0, "b": 0, "c": "0"}] for i in range(10): port_id = tbl.make_port() port_ids.append(port_id) - port_data.append({ - "a": port_id, - "b": port_id * 1.5, - "c": str(port_id) - }) + port_data.append({"a": port_id, "b": port_id * 1.5, "c": str(port_id)}) assert port_ids == list(range(0, 11)) - manager = PerspectiveManager() - manager.set_loop_callback(TestAsync.wrapped_queue_process) manager.host(tbl) + manager.set_loop_callback(TestAsync.loop.add_callback) - assert tbl.size() == 0 + assert syncify(lambda: tbl.size())() == 0 random.shuffle(port_ids) - for port_id in port_ids: - idx = port_id if port_id < len(port_ids) else len(port_ids) - 1 - tbl.update([port_data[idx]], port_id=port_id) - - # assert that process is being called asynchronously - assert SENTINEL.get() > 0 + @syncify + def _tbl_task(): + for port_id in port_ids: + idx = port_id if port_id < len(port_ids) else len(port_ids) - 1 + tbl.update([port_data[idx]], port_id=port_id) + size = tbl.size() + tbl.delete() + return size - tbl.delete() + assert len(port_ids) == 11 + assert _tbl_task() == 11 def test_async_multiple_managers_queue_process_multiple_ports(self): - tbl = Table({ - "a": int, - "b": float, - "c": str - }) - - tbl2 = Table({ - "a": int, - "b": float, - "c": str - }) - + tbl = Table({"a": int, "b": float, "c": str}) + tbl2 = Table({"a": int, "b": float, "c": str}) port_ids = [0] - port_data = [{ - "a": 0, - "b": 0, - "c": "0" - }] + port_data = [{"a": 0, "b": 0, "c": "0"}] for i in range(10): port_id = tbl.make_port() port_id2 = tbl2.make_port() - assert port_id == port_id2 - port_ids.append(port_id) - port_data.append({ - "a": port_id, - "b": port_id * 1.5, - "c": str(port_id) - }) + port_data.append({"a": port_id, "b": port_id * 1.5, "c": str(port_id)}) manager = PerspectiveManager() manager2 = PerspectiveManager() - manager.host_table("tbl", tbl) manager2.host_table("tbl2", tbl2) - manager.set_loop_callback(TestAsync.wrapped_queue_process) - manager2.set_loop_callback(TestAsync.wrapped_queue_process) + manager.set_loop_callback(TestAsync.loop.add_callback) + manager2.set_loop_callback(TestAsync.loop.add_callback) - random.shuffle(port_ids) - - for port_id in port_ids: - idx = port_id if port_id < len(port_ids) else len(port_ids) - 1 - tbl.update([port_data[idx]], port_id=port_id) - tbl2.update([port_data[idx]], port_id=port_id) + @syncify + def _task(): + random.shuffle(port_ids) + for port_id in port_ids: + idx = port_id if port_id < len(port_ids) else len(port_ids) - 1 + tbl.update([port_data[idx]], port_id=port_id) + tbl2.update([port_data[idx]], port_id=port_id) + return (tbl.size(), tbl2.size()) - # assert that process gets called at some point - assert SENTINEL.get() != 0 + assert _task() == (11, 11) def test_async_multiple_managers_mixed_queue_process_multiple_ports(self): - # mutate when synchronously calling queue_process for each update - SENTINEL_2 = AsyncSentinel(0) - - def sync_queue_process(f, *args, **kwargs): - SENTINEL_2.set(SENTINEL_2.get() - 1) - f(*args, **kwargs) - - tbl = Table({ - "a": int, - "b": float, - "c": str - }) + sentinel = {"async": 0, "sync": 0} - tbl2 = Table({ - "a": int, - "b": float, - "c": str - }) + def _counter(key, f, *args, **kwargs): + sentinel[key] += 1 + return f(*args, **kwargs) + sync_process = partial(_counter, "sync") + async_process = partial(TestAsync.loop.add_timeout, 1, _counter, "async") + tbl = Table({"a": int, "b": float, "c": str}) + tbl2 = Table({"a": int, "b": float, "c": str}) port_ids = [0] - port_data = [{ - "a": 0, - "b": 0, - "c": "0" - }] + port_data = [{"a": 0, "b": 0, "c": "0"}] for i in range(10): port_id = tbl.make_port() port_id2 = tbl2.make_port() - assert port_id == port_id2 - port_ids.append(port_id) - port_data.append({ - "a": port_id, - "b": port_id * 1.5, - "c": str(port_id) - }) + port_data.append({"a": port_id, "b": port_id * 1.5, "c": str(port_id)}) manager = PerspectiveManager() manager2 = PerspectiveManager() - manager.host_table("tbl", tbl) manager2.host_table("tbl2", tbl2) # manager uses tornado, manager2 is synchronous - manager.set_loop_callback(TestAsync.wrapped_queue_process) - manager2.set_loop_callback(sync_queue_process) - + manager.set_loop_callback(async_process) + manager2.set_loop_callback(sync_process) random.shuffle(port_ids) + @syncify + def _task(): + for port_id in port_ids: + idx = port_id if port_id < len(port_ids) else len(port_ids) - 1 + tbl.update([port_data[idx]], port_id=port_id) + + _task() for port_id in port_ids: idx = port_id if port_id < len(port_ids) else len(port_ids) - 1 - tbl.update([port_data[idx]], port_id=port_id) tbl2.update([port_data[idx]], port_id=port_id) - assert SENTINEL.get() != 0 - assert SENTINEL_2.get() == -12 - - tbl2.delete() - tbl.delete() - - def test_async_multiple_managers_delayed_process_multiple_ports(self): - from time import sleep - short_delay_queue_process = partial(queue_process_async_delay, TestAsync.loop, 0.5) - long_delay_queue_process = partial(queue_process_async_delay, TestAsync.loop, 1) - - tbl = Table({ - "a": int, - "b": float, - "c": str - }) - tbl2 = Table({ - "a": int, - "b": float, - "c": str - }) - manager = PerspectiveManager() - manager2 = PerspectiveManager() - - manager.host_table("tbl", tbl) - manager2.host_table("tbl2", tbl2) - - # The guarantee of `queue_process` is that eventually `_process` - # will be called, either by user action or loop iteration. By adding - # the delay, we can artificially queue up actions for later execution - # and see that it's working properly. - manager.set_loop_callback(short_delay_queue_process) - manager2.set_loop_callback(long_delay_queue_process) - - tbl_id = tbl._table.get_id() - tbl2_id = tbl2._table.get_id() - - for i in range(10): - tbl.update([data[i]]) - tbl2.update([data[i]]) - - assert SENTINEL.get() != 0 - - # updates are now queued - assert tbl_id in _PerspectiveStateManager.TO_PROCESS - assert tbl2_id in _PerspectiveStateManager.TO_PROCESS - - # Wait for the callbacks to run - we don't call any methods - # that would call `call_process`, but instead wait for the - # callbacks to execute asynchronously. - sleep(1) + @syncify + def _get_size(): + size = tbl.size() + tbl.delete() + return size + assert _get_size() == 11 + assert tbl2.size() == 11 + assert sentinel["async"] == 2 + assert sentinel["sync"] == 12 tbl2.delete() - tbl.delete()