Skip to content

Commit

Permalink
Release GIL for table(), table.update(), table.remove(), async …
Browse files Browse the repository at this point in the history
…conflated updates
  • Loading branch information
texodus committed Sep 21, 2020
1 parent e21b5a1 commit c1bbbf4
Show file tree
Hide file tree
Showing 21 changed files with 312 additions and 370 deletions.
1 change: 1 addition & 0 deletions cpp/perspective/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ set (SOURCE_FILES
${PSP_CPP_SRC}/src/cpp/pool.cpp
${PSP_CPP_SRC}/src/cpp/port.cpp
${PSP_CPP_SRC}/src/cpp/process_state.cpp
${PSP_CPP_SRC}/src/cpp/pyutils.cpp
${PSP_CPP_SRC}/src/cpp/raii.cpp
${PSP_CPP_SRC}/src/cpp/raii_impl_linux.cpp
${PSP_CPP_SRC}/src/cpp/raii_impl_osx.cpp
Expand Down
18 changes: 16 additions & 2 deletions cpp/perspective/src/cpp/gnode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
#include <perspective/logtime.h>
#include <perspective/utils.h>

#ifdef PSP_ENABLE_PYTHON
#include <perspective/pyutils.h>
#endif

namespace perspective {

t_tscalar
Expand Down Expand Up @@ -250,7 +254,7 @@ t_gnode::_process_table(t_uindex port_id) {
return result;
}

std::shared_ptr<t_port> input_port = m_input_ports[port_id];
std::shared_ptr<t_port>& input_port = m_input_ports[port_id];

if (input_port->get_table()->size() == 0) {
return result;
Expand Down Expand Up @@ -288,7 +292,7 @@ t_gnode::_process_table(t_uindex port_id) {
// contexts obliquely read gnode state at various points.
_update_contexts_from_state(flattened);

release_inputs();
input_port->release();
release_outputs();

#ifdef PSP_GNODE_VERIFY
Expand Down Expand Up @@ -593,6 +597,9 @@ bool
t_gnode::process(t_uindex port_id) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "Cannot `process` on an uninited gnode.");
#ifdef PSP_ENABLE_PYTHON
PerspectiveScopedGILRelease acquire(m_event_loop_thread_id);
#endif

t_process_table_result result = _process_table(port_id);

Expand Down Expand Up @@ -1347,6 +1354,13 @@ t_gnode::repr() const {
return ss.str();
}

#ifdef PSP_ENABLE_PYTHON
void
t_gnode::set_event_loop_thread_id(std::thread::id id) {
m_event_loop_thread_id = id;
}
#endif

void
t_gnode::register_context(const std::string& name, std::shared_ptr<t_ctx0> ctx) {
_register_context(name, ZERO_SIDED_CONTEXT, reinterpret_cast<std::int64_t>(ctx.get()));
Expand Down
33 changes: 23 additions & 10 deletions cpp/perspective/src/cpp/pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
#include <perspective/update_task.h>
#include <perspective/compat.h>
#include <perspective/env_vars.h>
#ifdef PSP_ENABLE_PYTHON
#include <thread>
#endif
#include <chrono>

namespace perspective {
Expand Down Expand Up @@ -48,6 +50,7 @@ empty_callback() {

t_pool::t_pool()
: m_update_delegate(empty_callback())
, m_event_loop_thread_id(std::thread::id())
, m_sleep(0) {
m_run.clear();
}
Expand Down Expand Up @@ -83,6 +86,11 @@ t_pool::register_gnode(t_gnode* node) {
t_uindex id = m_gnodes.size() - 1;
node->set_id(id);
node->set_pool_cleanup([this, id]() { this->m_gnodes[id] = 0; });
#ifdef PSP_ENABLE_PYTHON
if (m_event_loop_thread_id != std::thread::id()) {
node->set_event_loop_thread_id(m_event_loop_thread_id);
}
#endif

if (t_env::log_progress()) {
std::cout << "t_pool.register_gnode node => " << node << " rv => " << id << std::endl;
Expand Down Expand Up @@ -124,27 +132,32 @@ t_pool::send(t_uindex gnode_id, t_uindex port_id, const t_data_table& table) {
}
}

#ifdef PSP_ENABLE_PYTHON
void t_pool::set_event_loop() {
m_event_loop_thread_id = std::this_thread::get_id();
for (auto node : m_gnodes) {
node->set_event_loop_thread_id(m_event_loop_thread_id);
}
}

std::thread::id t_pool::get_event_loop_thread_id() const {
return m_event_loop_thread_id;
}
#endif

void
t_pool::_process_helper() {
t_pool::_process() {
auto work_to_do = m_data_remaining.load();
if (work_to_do) {
t_update_task task(*this);
task.run();
}
}

void
t_pool::_process() {
t_uindex sleep_time = m_sleep.load();
_process_helper();
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time));
}

void
t_pool::stop() {
m_run.clear(std::memory_order_release);
_process_helper();

_process();
if (t_env::log_progress()) {
std::cout << "t_pool.stop" << std::endl;
}
Expand Down
43 changes: 16 additions & 27 deletions cpp/perspective/src/cpp/pyutils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,28 @@
*/

#include <perspective/first.h>
#include <perspective/base.h>
#include <perspective/pyutils.h>
#include <frameobject.h>
#include <iostream>
#include <perspective/pythonhelpers.h>

#ifdef PSP_ENABLE_PYTHON
namespace perspective {

bool
curthread_has_gil() {
auto tstate = _PyThreadState_Current;
return tstate && (tstate == PyGILState_GetThisThreadState());
}

void
print_python_stack() {
auto tstate = PyThreadState_GET();
if (0 != tstate && 0 != tstate->frame) {
auto f = tstate->frame;
std::cout << "Python stack trace:" << std::endl;
while (0 != f) {
auto line = PyCode_Addr2Line(f->f_code, f->f_lasti);
auto filename = PyString_AsString(f->f_code->co_filename);
auto funcname = PyString_AsString(f->f_code->co_name);
std::cout << "\t" << filename << ":" << line << " : " << funcname << std::endl;
f = f->f_back;
PerspectiveScopedGILRelease::PerspectiveScopedGILRelease(std::thread::id event_loop_thread_id) : m_thread_state(NULL) {
if (event_loop_thread_id != std::thread::id()) {
if (std::this_thread::get_id() != event_loop_thread_id) {
std::stringstream err;
err << "Perspective called from wrong thread; Expected " << event_loop_thread_id << "; Got " << std::this_thread::get_id() << std::endl;
PSP_COMPLAIN_AND_ABORT(err.str());
}
m_thread_state = PyEval_SaveThread();
}
}

std::string
repr(PyObject* pyo) {
PyObjectPtr repr(PyObject_Repr(pyo));
return std::string(PyString_AsString(repr.get()));
PerspectiveScopedGILRelease::~PerspectiveScopedGILRelease() {
if (m_thread_state != NULL) {
PyEval_RestoreThread(m_thread_state);
}
}

} // end namespace perspective
} // end namespace perspective

#endif
8 changes: 8 additions & 0 deletions cpp/perspective/src/cpp/view.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,14 @@ View<CTX_T>::is_column_only() const {
return m_view_config->is_column_only();
}

#ifdef PSP_ENABLE_PYTHON
template <typename CTX_T>
std::thread::id
View<CTX_T>::get_event_loop_thread_id() const {
return m_table->get_pool()->get_event_loop_thread_id();
};
#endif

/******************************************************************************
*
* Private
Expand Down
11 changes: 11 additions & 0 deletions cpp/perspective/src/include/perspective/gnode.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
#include <perspective/computed_column_map.h>
#include <perspective/computed_function.h>
#include <tsl/ordered_map.h>
#ifdef PSP_ENABLE_PYTHON
#include <thread>
#endif
#ifdef PSP_PARALLEL_FOR
#include <tbb/parallel_sort.h>
#include <tbb/tbb.h>
Expand Down Expand Up @@ -190,6 +193,10 @@ class PERSPECTIVE_EXPORT t_gnode {
void pprint() const;
std::string repr() const;

#ifdef PSP_ENABLE_PYTHON
void set_event_loop_thread_id(std::thread::id id);
#endif

protected:
/**
* @brief Given `tbl`, notify each registered context with `tbl`.
Expand Down Expand Up @@ -390,6 +397,10 @@ class PERSPECTIVE_EXPORT t_gnode {
std::vector<t_custom_column> m_custom_columns;
std::function<void()> m_pool_cleanup;
bool m_was_updated;

#ifdef PSP_ENABLE_PYTHON
std::thread::id m_event_loop_thread_id;
#endif
};

/**
Expand Down
14 changes: 13 additions & 1 deletion cpp/perspective/src/include/perspective/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
#include <mutex>
#include <atomic>

#ifdef PSP_ENABLE_PYTHON
#include <thread>
#endif

#if defined PSP_ENABLE_WASM
#include <emscripten/val.h>
typedef emscripten::val t_val;
Expand Down Expand Up @@ -57,6 +61,11 @@ class PERSPECTIVE_EXPORT t_pool {
t_uindex gnode_id, const std::string& name, t_ctx_type type, std::int64_t ptr);
#endif

#ifdef PSP_ENABLE_PYTHON
void set_event_loop();
std::thread::id get_event_loop_thread_id() const;
#endif

/**
* @brief Call the binding language's `update_callback` method,
* set at initialize time.
Expand All @@ -74,7 +83,7 @@ class PERSPECTIVE_EXPORT t_pool {
void send(t_uindex gnode_id, t_uindex port_id, const t_data_table& table);

void _process();
void _process_helper();

void init();
void stop();
void set_sleep(t_uindex ms);
Expand Down Expand Up @@ -102,6 +111,9 @@ class PERSPECTIVE_EXPORT t_pool {
bool validate_gnode_id(t_uindex gnode_id) const;

private:
#ifdef PSP_ENABLE_PYTHON
std::thread::id m_event_loop_thread_id;
#endif
std::mutex m_mtx;
std::vector<t_gnode*> m_gnodes;

Expand Down
30 changes: 30 additions & 0 deletions cpp/perspective/src/include/perspective/pyutils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/******************************************************************************
*
* Copyright (c) 2017, the Perspective Authors.
*
* This file is part of the Perspective library, distributed under the terms of
* the Apache License 2.0. The full license can be found in the LICENSE file.
*
*/

#pragma once
#include <perspective/first.h>

#ifdef PSP_ENABLE_PYTHON
#include <thread>

namespace perspective {

class PerspectiveScopedGILRelease {
public:
PerspectiveScopedGILRelease(std::thread::id event_loop_thread_id);
~PerspectiveScopedGILRelease();
private:
PyThreadState* m_thread_state;
};


void _set_event_loop();

} // namespace perspective
#endif
6 changes: 6 additions & 0 deletions cpp/perspective/src/include/perspective/view.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <cstddef>
#include <memory>
#include <map>
#ifdef PSP_ENABLE_PYTHON
#include <thread>
#endif

namespace perspective {

Expand Down Expand Up @@ -224,6 +227,9 @@ class PERSPECTIVE_EXPORT View {
t_stepdelta get_step_delta(t_index bidx, t_index eidx) const;
t_dtype get_column_dtype(t_uindex idx) const;
bool is_column_only() const;
#ifdef PSP_ENABLE_PYTHON
std::thread::id get_event_loop_thread_id() const;
#endif

private:
/**
Expand Down
3 changes: 2 additions & 1 deletion python/perspective/perspective/include/perspective/python.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <perspective/binding.h>
#include <perspective/exception.h>
#include <perspective/exports.h>
#include <perspective/pyutils.h>
#include <perspective/python/accessor.h>
#include <perspective/python/base.h>
#include <perspective/python/computed.h>
Expand Down Expand Up @@ -255,6 +256,7 @@ PYBIND11_MODULE(libbinding, m)
.def(py::init<>())
.def("set_update_delegate", &t_pool::set_update_delegate)
.def("unregister_gnode", &t_pool::unregister_gnode)
.def("set_event_loop", &t_pool::set_event_loop)
.def("_process", &t_pool::_process);

/******************************************************************************
Expand Down Expand Up @@ -384,7 +386,6 @@ PYBIND11_MODULE(libbinding, m)
m.def("make_computations", &make_computations);
m.def("scalar_to_py", &scalar_to_py);
m.def("_set_nthreads", &_set_nthreads);
m.def("_set_event_loop", &_set_event_loop);
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <perspective/base.h>
#include <perspective/binding.h>
#include <perspective/pyutils.h>
#include <perspective/python/base.h>
#include <perspective/python/utils.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <perspective/base.h>
#include <perspective/binding.h>
#include <perspective/pyutils.h>
#include <perspective/python/base.h>

namespace perspective {
Expand Down
10 changes: 0 additions & 10 deletions python/perspective/perspective/include/perspective/python/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,6 @@
namespace perspective {
namespace binding {

class PerspectiveScopedGILRelease {
public:
PerspectiveScopedGILRelease();
~PerspectiveScopedGILRelease();
private:
PyThreadState* thread_state;
};


void _set_event_loop();

void _set_nthreads(int nthreads);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <perspective/base.h>
#include <perspective/binding.h>
#include <perspective/pyutils.h>
#include <perspective/python/base.h>
#include <perspective/python/utils.h>

Expand Down
Loading

0 comments on commit c1bbbf4

Please sign in to comment.