diff --git a/apis/python/src/tiledbsoma/_index_util.py b/apis/python/src/tiledbsoma/_index_util.py index 433e0433e0..1468dbbfbb 100644 --- a/apis/python/src/tiledbsoma/_index_util.py +++ b/apis/python/src/tiledbsoma/_index_util.py @@ -36,12 +36,6 @@ def tiledbsoma_build_index( Lifecycle: Experimental. """ - if context is not None: - tdb_concurrency = int( - context.tiledb_ctx.config().get("sm.compute_concurrency_level", 10) - ) - thread_count = max(1, tdb_concurrency // 2) - reindexer = clib.IntIndexer() - reindexer.map_locations(keys, thread_count) + reindexer.map_locations(keys, context) return reindexer # type: ignore[no-any-return] diff --git a/apis/python/src/tiledbsoma/reindexer.cc b/apis/python/src/tiledbsoma/reindexer.cc index 575c246ef6..4b6bfad439 100644 --- a/apis/python/src/tiledbsoma/reindexer.cc +++ b/apis/python/src/tiledbsoma/reindexer.cc @@ -46,23 +46,23 @@ void load_reindexer(py::module &m) { // between 0 and number of keys - 1) based on khash py::class_(m, "IntIndexer") .def(py::init<>()) - .def(py::init&, int>()) + .def(py::init&, std::shared_ptr>()) .def( "map_locations", [](IntIndexer& indexer, py::array_t keys, - int num_threads) { + std::shared_ptr context) { auto buffer = keys.request(); int64_t* data = static_cast(buffer.ptr); size_t length = buffer.shape[0]; - indexer.map_locations(keys.data(), keys.size(), num_threads); + indexer.map_locations(keys.data(), keys.size(), context); }) .def( "map_locations", [](IntIndexer& indexer, std::vector keys, - int num_threads) { - indexer.map_locations(keys.data(), keys.size(), num_threads); + int context) { + indexer.map_locations(keys.data(), keys.size(), context); }) // Perform lookup for a large input array of keys and return the looked // up value array (passing ownership from C++ to python) diff --git a/apis/python/tests/test_indexer.py b/apis/python/tests/test_indexer.py index 415f75749a..0b6b6c647a 100644 --- a/apis/python/tests/test_indexer.py +++ b/apis/python/tests/test_indexer.py @@ -21,7 +21,7 @@ def test_duplicate_key_indexer_error( ): context = _validate_soma_tiledb_context(SOMATileDBContext()) with pytest.raises(RuntimeError, match="There are duplicate keys."): - tiledbsoma_build_index(keys, context=context) + tiledbsoma_build_index(keys, context=context.native_context) pd_index = pd.Index(keys) with pytest.raises(pd.errors.InvalidIndexError): @@ -65,7 +65,7 @@ def test_duplicate_key_indexer_error( ) def test_indexer(keys: np.array, lookups: np.array): context = _validate_soma_tiledb_context(SOMATileDBContext()) - indexer = tiledbsoma_build_index(keys, context=context) + indexer = tiledbsoma_build_index(keys, context=context.native_context) results = indexer.get_indexer(lookups) panda_indexer = pd.Index(keys) panda_results = panda_indexer.get_indexer(lookups) diff --git a/libtiledbsoma/src/CMakeLists.txt b/libtiledbsoma/src/CMakeLists.txt index 923652c7c5..e89a870b10 100644 --- a/libtiledbsoma/src/CMakeLists.txt +++ b/libtiledbsoma/src/CMakeLists.txt @@ -58,6 +58,7 @@ add_library(TILEDB_SOMA_OBJECTS OBJECT ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_collection.cc ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_experiment.cc ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_measurement.cc + ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_context.cc ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_dataframe.cc ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_dense_ndarray.cc ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_sparse_ndarray.cc diff --git a/libtiledbsoma/src/reindexer/reindexer.cc b/libtiledbsoma/src/reindexer/reindexer.cc index fbd52fcc02..8ccb21dea1 100644 --- a/libtiledbsoma/src/reindexer/reindexer.cc +++ b/libtiledbsoma/src/reindexer/reindexer.cc @@ -47,16 +47,15 @@ KHASH_MAP_INIT_INT64(m64, int64_t) namespace tiledbsoma { void IntIndexer::map_locations( - const int64_t* keys, size_t size, size_t threads) { + const int64_t* keys, + size_t size, + std::shared_ptr context) { map_size_ = size; // Handling edge cases if (size == 0) { return; } - if (threads == 0) { - throw std::runtime_error("Re-indexer thread count cannot be zero."); - } hash_ = kh_init(m64); kh_resize(m64, hash_, size * 1.25); @@ -64,10 +63,8 @@ void IntIndexer::map_locations( khint64_t k; int64_t counter = 0; // Hash map construction - LOG_DEBUG(fmt::format( - "[Re-indexer] Start of Map locations with {} keys and {} threads", - size, - threads)); + LOG_DEBUG( + fmt::format("[Re-indexer] Start of Map locations with {} keys", size)); for (size_t i = 0; i < size; i++) { k = kh_put(m64, hash_, keys[i], &ret); assert(k != kh_end(hash_)); @@ -79,18 +76,19 @@ void IntIndexer::map_locations( } auto hsize = kh_size(hash_); LOG_DEBUG(fmt::format("[Re-indexer] khash size = {}", hsize)); - if (threads > 1) { - tiledb_thread_pool_ = std::make_unique(threads); - } + LOG_DEBUG( fmt::format("[Re-indexer] Thread pool started and hash table created")); + context_ = context; } void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { if (size == 0) { return; } - if (tiledb_thread_pool_ == nullptr) { // When concurrency is 1 + // Single thread checks + if (context_ == nullptr || context_->thread_pool() == nullptr || + context_->thread_pool()->concurrency_level() == 1) { for (int i = 0; i < size; i++) { auto k = kh_get(m64, hash_, keys[i]); if (k == kh_end(hash_)) { @@ -104,12 +102,13 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { } LOG_DEBUG(fmt::format( "Lookup with thread concurrency {} on data size {}", - tiledb_thread_pool_->concurrency_level(), + context_->thread_pool()->concurrency_level(), size)); std::vector tasks; - size_t thread_chunk_size = size / tiledb_thread_pool_->concurrency_level(); + size_t thread_chunk_size = size / + context_->thread_pool()->concurrency_level(); if (thread_chunk_size == 0) { thread_chunk_size = 1; } @@ -122,7 +121,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { } LOG_DEBUG(fmt::format( "Creating tileDB task for the range from {} to {} ", start, end)); - tiledbsoma::ThreadPool::Task task = tiledb_thread_pool_->execute( + tiledbsoma::ThreadPool::Task task = context_->thread_pool()->execute( [this, start, end, &results, &keys]() { for (size_t i = start; i < end; i++) { auto k = kh_get(m64, hash_, keys[i]); @@ -142,7 +141,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { start, end)); } - tiledb_thread_pool_->wait_all(tasks); + context_->thread_pool()->wait_all(tasks); } IntIndexer::~IntIndexer() { @@ -151,8 +150,11 @@ IntIndexer::~IntIndexer() { } } -IntIndexer::IntIndexer(const int64_t* keys, int size, int threads) { - map_locations(keys, size, threads); +IntIndexer::IntIndexer( + const int64_t* keys, + int size, + std::shared_ptr context) { + map_locations(keys, size, context); } } // namespace tiledbsoma \ No newline at end of file diff --git a/libtiledbsoma/src/reindexer/reindexer.h b/libtiledbsoma/src/reindexer/reindexer.h index 5996ab5065..24d1215de6 100644 --- a/libtiledbsoma/src/reindexer/reindexer.h +++ b/libtiledbsoma/src/reindexer/reindexer.h @@ -44,6 +44,7 @@ struct kh_m64_s; namespace tiledbsoma { class ThreadPool; +class SOMAContext; class IntIndexer { public: @@ -53,9 +54,14 @@ class IntIndexer { * @param size yhr number of keys in the put * @param threads number of threads in the thread pool */ - void map_locations(const int64_t* keys, size_t size, size_t threads = 4); - void map_locations(const std::vector& keys, size_t threads = 4) { - map_locations(keys.data(), keys.size(), threads); + void map_locations( + const int64_t* keys, + size_t size, + std::shared_ptr context); + void map_locations( + const std::vector& keys, + std::shared_ptr context) { + map_locations(keys.data(), keys.size(), context); } /** * Used for parallel lookup using khash @@ -77,9 +83,14 @@ class IntIndexer { /** * Constructor with the same arguments as map_locations */ - IntIndexer(const int64_t* keys, int size, int threads); - IntIndexer(const std::vector& keys, int threads) - : IntIndexer(keys.data(), keys.size(), threads) { + IntIndexer( + const int64_t* keys, + int size, + std::shared_ptr context); + IntIndexer( + const std::vector& keys, + std::shared_ptr context) + : IntIndexer(keys.data(), keys.size(), context) { } virtual ~IntIndexer(); @@ -88,10 +99,8 @@ class IntIndexer { * The created 64bit hash table */ kh_m64_s* hash_; - /* - * TileDB threadpool - */ - std::shared_ptr tiledb_thread_pool_ = nullptr; + + std::shared_ptr context_ = nullptr; /* * Number of elements in the map set by map_locations */ diff --git a/libtiledbsoma/src/soma/soma_context.cc b/libtiledbsoma/src/soma/soma_context.cc new file mode 100644 index 0000000000..801495e61c --- /dev/null +++ b/libtiledbsoma/src/soma/soma_context.cc @@ -0,0 +1,49 @@ +/** + * @file soma_dataframe.h + * + * @section LICENSE + * + * The MIT License + * + * @copyright Copyright (c) 2023 TileDB, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + * @section DESCRIPTION + * + * This file defines the SOMAContext class. + */ +#include "soma_context.h" +#include + +namespace tiledbsoma { + +void SOMAContext::create_thread_pool() { + // Extracting concurrency from tiledb config + auto cfg = tiledb_config(); + int concurrency = 10; + if (cfg.find("sm.compute_concurrency_level") != cfg.end()) { + concurrency = std::stoi(cfg["sm.compute_concurrency_level"]); + } + int thread_count = std::max(1, concurrency / 2); + if (thread_count > 1) { + thread_pool() = std::make_shared(thread_count); + } +} +} // namespace tiledbsoma diff --git a/libtiledbsoma/src/soma/soma_context.h b/libtiledbsoma/src/soma/soma_context.h index 0766113082..be1237fb70 100644 --- a/libtiledbsoma/src/soma/soma_context.h +++ b/libtiledbsoma/src/soma/soma_context.h @@ -38,6 +38,7 @@ #include namespace tiledbsoma { +class ThreadPool; using namespace tiledb; @@ -47,10 +48,14 @@ class SOMAContext { //= public non-static //=================================================================== SOMAContext() - : ctx_(std::make_shared(Config({}))){}; + : ctx_(std::make_shared(Config({}))) { + create_thread_pool(); + }; SOMAContext(std::map platform_config) - : ctx_(std::make_shared(Config(platform_config))){}; + : ctx_(std::make_shared(Config(platform_config))) { + create_thread_pool(); + }; bool operator==(const SOMAContext& other) const { return ctx_ == other.ctx_; @@ -67,13 +72,22 @@ class SOMAContext { return cfg; } + std::shared_ptr& thread_pool() { + return thread_pool_; + } + private: //=================================================================== //= private non-static //=================================================================== + void create_thread_pool(); + // TileDB context std::shared_ptr ctx_; + + // Threadpool + std::shared_ptr thread_pool_ = nullptr; }; } // namespace tiledbsoma diff --git a/libtiledbsoma/test/test_indexer.cc b/libtiledbsoma/test/test_indexer.cc index c99d13ff61..784297dc48 100644 --- a/libtiledbsoma/test/test_indexer.cc +++ b/libtiledbsoma/test/test_indexer.cc @@ -31,8 +31,10 @@ */ #include +#include #include #include +#include #include #include #include @@ -57,9 +59,10 @@ bool run_test(int id, std::vector keys, std::vector lookups) { try { std::vector indexer_results; indexer_results.resize(lookups.size()); + auto context = std::make_shared(); tiledbsoma::IntIndexer indexer; - indexer.map_locations(keys); + indexer.map_locations(keys, context); auto* hash = kh_init(m64); int ret; khint64_t k;