Skip to content

Commit

Permalink
[python, C++] Re-indexer context thread pool
Browse files Browse the repository at this point in the history
Each C++ SOMAContext has it's own lazily created thread pool
  • Loading branch information
beroy committed Feb 23, 2024
1 parent 568d2e4 commit 8b72dfb
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 41 deletions.
4 changes: 2 additions & 2 deletions apis/python/src/tiledbsoma/_index_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def tiledbsoma_build_index(
tdb_concurrency = int(
context.tiledb_ctx.config().get("sm.compute_concurrency_level", 10)
)
thread_count = max(1, tdb_concurrency // 2)
max(1, tdb_concurrency // 2)

reindexer = clib.IntIndexer()
reindexer.map_locations(keys, thread_count)
reindexer.map_locations(keys, context)
return reindexer # type: ignore[no-any-return]
13 changes: 3 additions & 10 deletions apis/python/src/tiledbsoma/reindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,16 @@ void load_reindexer(py::module &m) {
// between 0 and number of keys - 1) based on khash
py::class_<IntIndexer>(m, "IntIndexer")
.def(py::init<>())
.def(py::init<std::vector<int64_t>&, int>())
.def(py::init<std::vector<int64_t>&, std::shared_ptr<SOMAContext>>())
.def(
"map_locations",
[](IntIndexer& indexer,
py::array_t<int64_t> keys,
int num_threads) {
std::shared_ptr<SOMAContext> context) {
auto buffer = keys.request();
int64_t* data = static_cast<int64_t*>(buffer.ptr);
size_t length = buffer.shape[0];
indexer.map_locations(keys.data(), keys.size(), num_threads);
})
.def(
"map_locations",
[](IntIndexer& indexer,
std::vector<int64_t> keys,
int num_threads) {
indexer.map_locations(keys.data(), keys.size(), num_threads);
indexer.map_locations(keys.data(), keys.size(), context);
})
// Perform lookup for a large input array of keys and return the looked
// up value array (passing ownership from C++ to python)
Expand Down
51 changes: 34 additions & 17 deletions libtiledbsoma/src/reindexer/reindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,15 @@ KHASH_MAP_INIT_INT64(m64, int64_t)
namespace tiledbsoma {

void IntIndexer::map_locations(
const int64_t* keys, size_t size, size_t threads) {
const int64_t* keys,
size_t size,
std::shared_ptr<tiledbsoma::SOMAContext> context) {
map_size_ = size;

// Handling edge cases
if (size == 0) {
return;
}
if (threads == 0) {
throw std::runtime_error("Re-indexer thread count cannot be zero.");
}

hash_ = kh_init(m64);
kh_resize(m64, hash_, size * 1.25);
Expand All @@ -65,9 +64,8 @@ void IntIndexer::map_locations(
int64_t counter = 0;
// Hash map construction
LOG_DEBUG(fmt::format(
"[Re-indexer] Start of Map locations with {} keys and {} threads",
size,
threads));
"[Re-indexer] Start of Map locations with {} keys",
size));
for (size_t i = 0; i < size; i++) {
k = kh_put(m64, hash_, keys[i], &ret);
assert(k != kh_end(hash_));
Expand All @@ -79,18 +77,33 @@ void IntIndexer::map_locations(
}
auto hsize = kh_size(hash_);
LOG_DEBUG(fmt::format("[Re-indexer] khash size = {}", hsize));
if (threads > 1) {
tiledb_thread_pool_ = std::make_unique<tiledbsoma::ThreadPool>(threads);
}

LOG_DEBUG(
fmt::format("[Re-indexer] Thread pool started and hash table created"));
context_ = context;
}

void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
if (size == 0) {
return;
}
if (tiledb_thread_pool_ == nullptr) { // When concurrency is 1
// Creating threadpool lazily when the first lookup happens
if (context_ != nullptr && context_->thread_pool() == nullptr) {
// Extracting concurrency from tiledb config
auto cfg = context_->tiledb_config();
int concurrency = 10;
if (cfg.find("sm.compute_concurrency_level") != cfg.end()) {
concurrency = std::stoi(cfg["sm.compute_concurrency_level"]);
}
int thread_count = std::max(1, concurrency / 2);
if (thread_count > 1) {
context_->thread_pool() = std::make_shared<tiledbsoma::ThreadPool>(
thread_count);
}
}
// Single thread checks
if (context_ == nullptr || context_->thread_pool() == nullptr ||
context_->thread_pool()->concurrency_level() == 1) {
for (int i = 0; i < size; i++) {
auto k = kh_get(m64, hash_, keys[i]);
if (k == kh_end(hash_)) {
Expand All @@ -104,12 +117,13 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
}
LOG_DEBUG(fmt::format(
"Lookup with thread concurrency {} on data size {}",
tiledb_thread_pool_->concurrency_level(),
context_->thread_pool()->concurrency_level(),
size));

std::vector<tiledbsoma::ThreadPool::Task> tasks;

size_t thread_chunk_size = size / tiledb_thread_pool_->concurrency_level();
size_t thread_chunk_size = size /
context_->thread_pool()->concurrency_level();
if (thread_chunk_size == 0) {
thread_chunk_size = 1;
}
Expand All @@ -122,7 +136,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
}
LOG_DEBUG(fmt::format(
"Creating tileDB task for the range from {} to {} ", start, end));
tiledbsoma::ThreadPool::Task task = tiledb_thread_pool_->execute(
tiledbsoma::ThreadPool::Task task = context_->thread_pool()->execute(
[this, start, end, &results, &keys]() {
for (size_t i = start; i < end; i++) {
auto k = kh_get(m64, hash_, keys[i]);
Expand All @@ -142,7 +156,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
start,
end));
}
tiledb_thread_pool_->wait_all(tasks);
context_->thread_pool()->wait_all(tasks);
}

IntIndexer::~IntIndexer() {
Expand All @@ -151,8 +165,11 @@ IntIndexer::~IntIndexer() {
}
}

IntIndexer::IntIndexer(const int64_t* keys, int size, int threads) {
map_locations(keys, size, threads);
IntIndexer::IntIndexer(
const int64_t* keys,
int size,
std::shared_ptr<tiledbsoma::SOMAContext> context) {
map_locations(keys, size, context);
}

} // namespace tiledbsoma
29 changes: 19 additions & 10 deletions libtiledbsoma/src/reindexer/reindexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct kh_m64_s;
namespace tiledbsoma {

class ThreadPool;
class SOMAContext;

class IntIndexer {
public:
Expand All @@ -53,9 +54,14 @@ class IntIndexer {
* @param size yhr number of keys in the put
* @param threads number of threads in the thread pool
*/
void map_locations(const int64_t* keys, size_t size, size_t threads = 4);
void map_locations(const std::vector<int64_t>& keys, size_t threads = 4) {
map_locations(keys.data(), keys.size(), threads);
void map_locations(
const int64_t* keys,
size_t size,
std::shared_ptr<tiledbsoma::SOMAContext> context);
void map_locations(
const std::vector<int64_t>& keys,
std::shared_ptr<tiledbsoma::SOMAContext> context) {
map_locations(keys.data(), keys.size(), context);
}
/**
* Used for parallel lookup using khash
Expand All @@ -77,9 +83,14 @@ class IntIndexer {
/**
* Constructor with the same arguments as map_locations
*/
IntIndexer(const int64_t* keys, int size, int threads);
IntIndexer(const std::vector<int64_t>& keys, int threads)
: IntIndexer(keys.data(), keys.size(), threads) {
IntIndexer(
const int64_t* keys,
int size,
std::shared_ptr<tiledbsoma::SOMAContext> context);
IntIndexer(
const std::vector<int64_t>& keys,
std::shared_ptr<tiledbsoma::SOMAContext> context)
: IntIndexer(keys.data(), keys.size(), context) {
}
virtual ~IntIndexer();

Expand All @@ -88,10 +99,8 @@ class IntIndexer {
* The created 64bit hash table
*/
kh_m64_s* hash_;
/*
* TileDB threadpool
*/
std::shared_ptr<tiledbsoma::ThreadPool> tiledb_thread_pool_ = nullptr;

std::shared_ptr<SOMAContext> context_ = nullptr;
/*
* Number of elements in the map set by map_locations
*/
Expand Down
1 change: 0 additions & 1 deletion libtiledbsoma/src/soma/soma_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class SOMAArray : public SOMAObject {
//===================================================================
//= public non-static
//===================================================================

/**
* @brief Construct a new SOMAArray object
*
Expand Down
8 changes: 8 additions & 0 deletions libtiledbsoma/src/soma/soma_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <tiledb/tiledb>

namespace tiledbsoma {
class ThreadPool;

using namespace tiledb;

Expand Down Expand Up @@ -67,13 +68,20 @@ class SOMAContext {
return cfg;
}

std::shared_ptr<ThreadPool>& thread_pool() {
return thread_pool_;
}

private:
//===================================================================
//= private non-static
//===================================================================

// TileDB context
std::shared_ptr<Context> ctx_;

// Threadpool
std::shared_ptr<ThreadPool> thread_pool_ = nullptr;
};
} // namespace tiledbsoma

Expand Down
5 changes: 4 additions & 1 deletion libtiledbsoma/test/test_indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
*/

#include <reindexer/reindexer.h>
#include <soma/soma_context.h>
#include <catch2/catch_test_macros.hpp>
#include <cstdint>
#include <memory>
#include <string>
#include <tiledb/tiledb>
#include <unordered_map>
Expand All @@ -57,9 +59,10 @@ bool run_test(int id, std::vector<int64_t> keys, std::vector<int64_t> lookups) {
try {
std::vector<int64_t> indexer_results;
indexer_results.resize(lookups.size());
auto context = std::make_shared<tiledbsoma::SOMAContext>();

tiledbsoma::IntIndexer indexer;
indexer.map_locations(keys);
indexer.map_locations(keys, context);
auto* hash = kh_init(m64);
int ret;
khint64_t k;
Expand Down

0 comments on commit 8b72dfb

Please sign in to comment.