Skip to content

Commit

Permalink
[python, C++] Re-indexer context thread pool
Browse files Browse the repository at this point in the history
Each C++ SOMAContext has it's own lazily created thread pool
  • Loading branch information
beroy committed Feb 23, 2024
1 parent 83e51e3 commit bc0e22a
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 45 deletions.
8 changes: 1 addition & 7 deletions apis/python/src/tiledbsoma/_index_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ def tiledbsoma_build_index(
Lifecycle:
Experimental.
"""
if context is not None:
tdb_concurrency = int(
context.tiledb_ctx.config().get("sm.compute_concurrency_level", 10)
)
thread_count = max(1, tdb_concurrency // 2)

reindexer = clib.IntIndexer()
reindexer.map_locations(keys, thread_count)
reindexer.map_locations(keys, context)
return reindexer # type: ignore[no-any-return]
10 changes: 5 additions & 5 deletions apis/python/src/tiledbsoma/reindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,23 @@ void load_reindexer(py::module &m) {
// between 0 and number of keys - 1) based on khash
py::class_<IntIndexer>(m, "IntIndexer")
.def(py::init<>())
.def(py::init<std::vector<int64_t>&, int>())
.def(py::init<std::vector<int64_t>&, std::shared_ptr<SOMAContext>>())
.def(
"map_locations",
[](IntIndexer& indexer,
py::array_t<int64_t> keys,
int num_threads) {
std::shared_ptr<SOMAContext> context) {
auto buffer = keys.request();
int64_t* data = static_cast<int64_t*>(buffer.ptr);
size_t length = buffer.shape[0];
indexer.map_locations(keys.data(), keys.size(), num_threads);
indexer.map_locations(keys.data(), keys.size(), context);
})
.def(
"map_locations",
[](IntIndexer& indexer,
std::vector<int64_t> keys,
int num_threads) {
indexer.map_locations(keys.data(), keys.size(), num_threads);
std::shared_ptr<SOMAContext> context) {
indexer.map_locations(keys.data(), keys.size(), context);
})
// Perform lookup for a large input array of keys and return the looked
// up value array (passing ownership from C++ to python)
Expand Down
4 changes: 2 additions & 2 deletions apis/python/tests/test_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_duplicate_key_indexer_error(
):
context = _validate_soma_tiledb_context(SOMATileDBContext())
with pytest.raises(RuntimeError, match="There are duplicate keys."):
tiledbsoma_build_index(keys, context=context)
tiledbsoma_build_index(keys, context=context.native_context)

pd_index = pd.Index(keys)
with pytest.raises(pd.errors.InvalidIndexError):
Expand Down Expand Up @@ -65,7 +65,7 @@ def test_duplicate_key_indexer_error(
)
def test_indexer(keys: np.array, lookups: np.array):
context = _validate_soma_tiledb_context(SOMATileDBContext())
indexer = tiledbsoma_build_index(keys, context=context)
indexer = tiledbsoma_build_index(keys, context=context.native_context)
results = indexer.get_indexer(lookups)
panda_indexer = pd.Index(keys)
panda_results = panda_indexer.get_indexer(lookups)
Expand Down
1 change: 1 addition & 0 deletions libtiledbsoma/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ add_library(TILEDB_SOMA_OBJECTS OBJECT
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_collection.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_experiment.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_measurement.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_context.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_dataframe.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_dense_ndarray.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_sparse_ndarray.cc
Expand Down
38 changes: 20 additions & 18 deletions libtiledbsoma/src/reindexer/reindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,24 @@ KHASH_MAP_INIT_INT64(m64, int64_t)
namespace tiledbsoma {

void IntIndexer::map_locations(
const int64_t* keys, size_t size, size_t threads) {
const int64_t* keys,
size_t size,
std::shared_ptr<tiledbsoma::SOMAContext> context) {
map_size_ = size;

// Handling edge cases
if (size == 0) {
return;
}
if (threads == 0) {
throw std::runtime_error("Re-indexer thread count cannot be zero.");
}

hash_ = kh_init(m64);
kh_resize(m64, hash_, size * 1.25);
int ret;
khint64_t k;
int64_t counter = 0;
// Hash map construction
LOG_DEBUG(fmt::format(
"[Re-indexer] Start of Map locations with {} keys and {} threads",
size,
threads));
LOG_DEBUG(
fmt::format("[Re-indexer] Start of Map locations with {} keys", size));
for (size_t i = 0; i < size; i++) {
k = kh_put(m64, hash_, keys[i], &ret);
assert(k != kh_end(hash_));
Expand All @@ -79,18 +76,19 @@ void IntIndexer::map_locations(
}
auto hsize = kh_size(hash_);
LOG_DEBUG(fmt::format("[Re-indexer] khash size = {}", hsize));
if (threads > 1) {
tiledb_thread_pool_ = std::make_unique<tiledbsoma::ThreadPool>(threads);
}

LOG_DEBUG(
fmt::format("[Re-indexer] Thread pool started and hash table created"));
context_ = context;
}

void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
if (size == 0) {
return;
}
if (tiledb_thread_pool_ == nullptr) { // When concurrency is 1
// Single thread checks
if (context_ == nullptr || context_->thread_pool() == nullptr ||
context_->thread_pool()->concurrency_level() == 1) {
for (int i = 0; i < size; i++) {
auto k = kh_get(m64, hash_, keys[i]);
if (k == kh_end(hash_)) {
Expand All @@ -104,12 +102,13 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
}
LOG_DEBUG(fmt::format(
"Lookup with thread concurrency {} on data size {}",
tiledb_thread_pool_->concurrency_level(),
context_->thread_pool()->concurrency_level(),
size));

std::vector<tiledbsoma::ThreadPool::Task> tasks;

size_t thread_chunk_size = size / tiledb_thread_pool_->concurrency_level();
size_t thread_chunk_size = size /
context_->thread_pool()->concurrency_level();
if (thread_chunk_size == 0) {
thread_chunk_size = 1;
}
Expand All @@ -122,7 +121,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
}
LOG_DEBUG(fmt::format(
"Creating tileDB task for the range from {} to {} ", start, end));
tiledbsoma::ThreadPool::Task task = tiledb_thread_pool_->execute(
tiledbsoma::ThreadPool::Task task = context_->thread_pool()->execute(
[this, start, end, &results, &keys]() {
for (size_t i = start; i < end; i++) {
auto k = kh_get(m64, hash_, keys[i]);
Expand All @@ -142,7 +141,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
start,
end));
}
tiledb_thread_pool_->wait_all(tasks);
context_->thread_pool()->wait_all(tasks);
}

IntIndexer::~IntIndexer() {
Expand All @@ -151,8 +150,11 @@ IntIndexer::~IntIndexer() {
}
}

IntIndexer::IntIndexer(const int64_t* keys, int size, int threads) {
map_locations(keys, size, threads);
IntIndexer::IntIndexer(
const int64_t* keys,
int size,
std::shared_ptr<tiledbsoma::SOMAContext> context) {
map_locations(keys, size, context);
}

} // namespace tiledbsoma
29 changes: 19 additions & 10 deletions libtiledbsoma/src/reindexer/reindexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct kh_m64_s;
namespace tiledbsoma {

class ThreadPool;
class SOMAContext;

class IntIndexer {
public:
Expand All @@ -53,9 +54,14 @@ class IntIndexer {
* @param size yhr number of keys in the put
* @param threads number of threads in the thread pool
*/
void map_locations(const int64_t* keys, size_t size, size_t threads = 4);
void map_locations(const std::vector<int64_t>& keys, size_t threads = 4) {
map_locations(keys.data(), keys.size(), threads);
void map_locations(
const int64_t* keys,
size_t size,
std::shared_ptr<tiledbsoma::SOMAContext> context);
void map_locations(
const std::vector<int64_t>& keys,
std::shared_ptr<tiledbsoma::SOMAContext> context) {
map_locations(keys.data(), keys.size(), context);
}
/**
* Used for parallel lookup using khash
Expand All @@ -77,9 +83,14 @@ class IntIndexer {
/**
* Constructor with the same arguments as map_locations
*/
IntIndexer(const int64_t* keys, int size, int threads);
IntIndexer(const std::vector<int64_t>& keys, int threads)
: IntIndexer(keys.data(), keys.size(), threads) {
IntIndexer(
const int64_t* keys,
int size,
std::shared_ptr<tiledbsoma::SOMAContext> context);
IntIndexer(
const std::vector<int64_t>& keys,
std::shared_ptr<tiledbsoma::SOMAContext> context)
: IntIndexer(keys.data(), keys.size(), context) {
}
virtual ~IntIndexer();

Expand All @@ -88,10 +99,8 @@ class IntIndexer {
* The created 64bit hash table
*/
kh_m64_s* hash_;
/*
* TileDB threadpool
*/
std::shared_ptr<tiledbsoma::ThreadPool> tiledb_thread_pool_ = nullptr;

std::shared_ptr<SOMAContext> context_ = nullptr;
/*
* Number of elements in the map set by map_locations
*/
Expand Down
49 changes: 49 additions & 0 deletions libtiledbsoma/src/soma/soma_context.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* @file soma_dataframe.h
*
* @section LICENSE
*
* The MIT License
*
* @copyright Copyright (c) 2023 TileDB, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* @section DESCRIPTION
*
* This file defines the SOMAContext class.
*/
#include "soma_context.h"
#include <thread_pool/thread_pool.h>

namespace tiledbsoma {

void SOMAContext::create_thread_pool() {
// Extracting concurrency from tiledb config
auto cfg = tiledb_config();
int concurrency = 10;
if (cfg.find("sm.compute_concurrency_level") != cfg.end()) {
concurrency = std::stoi(cfg["sm.compute_concurrency_level"]);
}
int thread_count = std::max(1, concurrency / 2);
if (thread_count > 1) {
thread_pool() = std::make_shared<ThreadPool>(thread_count);
}
}
} // namespace tiledbsoma
18 changes: 16 additions & 2 deletions libtiledbsoma/src/soma/soma_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <tiledb/tiledb>

namespace tiledbsoma {
class ThreadPool;

using namespace tiledb;

Expand All @@ -47,10 +48,14 @@ class SOMAContext {
//= public non-static
//===================================================================
SOMAContext()
: ctx_(std::make_shared<Context>(Config({}))){};
: ctx_(std::make_shared<Context>(Config({}))) {
create_thread_pool();
};

SOMAContext(std::map<std::string, std::string> platform_config)
: ctx_(std::make_shared<Context>(Config(platform_config))){};
: ctx_(std::make_shared<Context>(Config(platform_config))) {
create_thread_pool();
};

bool operator==(const SOMAContext& other) const {
return ctx_ == other.ctx_;
Expand All @@ -67,13 +72,22 @@ class SOMAContext {
return cfg;
}

std::shared_ptr<ThreadPool>& thread_pool() {
return thread_pool_;
}

private:
//===================================================================
//= private non-static
//===================================================================

void create_thread_pool();

// TileDB context
std::shared_ptr<Context> ctx_;

// Threadpool
std::shared_ptr<ThreadPool> thread_pool_ = nullptr;
};
} // namespace tiledbsoma

Expand Down
5 changes: 4 additions & 1 deletion libtiledbsoma/test/test_indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
*/

#include <reindexer/reindexer.h>
#include <soma/soma_context.h>
#include <catch2/catch_test_macros.hpp>
#include <cstdint>
#include <memory>
#include <string>
#include <tiledb/tiledb>
#include <unordered_map>
Expand All @@ -57,9 +59,10 @@ bool run_test(int id, std::vector<int64_t> keys, std::vector<int64_t> lookups) {
try {
std::vector<int64_t> indexer_results;
indexer_results.resize(lookups.size());
auto context = std::make_shared<tiledbsoma::SOMAContext>();

tiledbsoma::IntIndexer indexer;
indexer.map_locations(keys);
indexer.map_locations(keys, context);
auto* hash = kh_init(m64);
int ret;
khint64_t k;
Expand Down

0 comments on commit bc0e22a

Please sign in to comment.