diff --git a/apis/python/src/tiledbsoma/managed_query.cc b/apis/python/src/tiledbsoma/managed_query.cc index 4b21db33e6..638748e78c 100644 --- a/apis/python/src/tiledbsoma/managed_query.cc +++ b/apis/python/src/tiledbsoma/managed_query.cc @@ -136,34 +136,55 @@ void load_managed_query(py::module& m) { } }) + // .def( + // "next", + // [](ManagedQuery& mq) -> std::optional { + // // Release python GIL before reading data + // py::gil_scoped_release release; + + // mq.setup_read(); + + // if (mq.is_empty_query() && mq.is_first_read()) { + // auto tbl = mq.results(); + // // Acquire python GIL before accessing python objects + // py::gil_scoped_acquire acquire; + // return to_table(std::make_optional(tbl)); + // } + + // if (mq.is_complete(false)) { + // throw py::stop_iteration(); + // } + + // try { + // mq.submit_read(); + // auto tbl = mq.results(); + // // Acquire python GIL before accessing python objects + // py::gil_scoped_acquire acquire; + // return to_table(std::make_optional(tbl)); + // } catch (const std::exception& e) { + // throw TileDBSOMAError(e.what()); + // } + // }) + .def( "next", [](ManagedQuery& mq) -> std::optional { // Release python GIL before reading data py::gil_scoped_release release; - - mq.setup_read(); - - if (mq.is_empty_query() && mq.is_first_read()) { - auto tbl = mq.results(); + std::optional> tbl; + try { + tbl = mq.read_next(); // Acquire python GIL before accessing python objects - py::gil_scoped_acquire acquire; - return to_table(std::make_optional(tbl)); + } catch (const std::exception& e) { + throw TileDBSOMAError(e.what()); } + py::gil_scoped_acquire acquire; - if (mq.is_complete(false)) { + if (!tbl) { throw py::stop_iteration(); } - try { - mq.submit_read(); - auto tbl = mq.results(); - // Acquire python GIL before accessing python objects - py::gil_scoped_acquire acquire; - return to_table(std::make_optional(tbl)); - } catch (const std::exception& e) { - throw TileDBSOMAError(e.what()); - } + return to_table(tbl); }) .def( diff --git a/libtiledbsoma/src/soma/managed_query.cc b/libtiledbsoma/src/soma/managed_query.cc index e21be8dc15..3b89c9a9a6 100644 --- a/libtiledbsoma/src/soma/managed_query.cc +++ b/libtiledbsoma/src/soma/managed_query.cc @@ -203,6 +203,104 @@ void ManagedQuery::submit_read() { }); } +std::optional> ManagedQuery::read_next() { + setup_read(); + + if (is_empty_query() && !query_submitted_) { + query_submitted_ = true; + return buffers_; + } + + if (is_complete(false)) { + return std::nullopt; + } + + query_submitted_ = true; + query_future_ = std::async(std::launch::async, [&]() { + LOG_DEBUG("[ManagedQuery] submit thread start"); + try { + query_->submit(); + } catch (const std::exception& e) { + return StatusAndException(false, e.what()); + } + LOG_DEBUG("[ManagedQuery] submit thread done"); + return StatusAndException(true, "success"); + }); + + if (query_future_.valid()) { + LOG_DEBUG(std::format("[ManagedQuery] [{}] Waiting for query", name_)); + query_future_.wait(); + LOG_DEBUG( + std::format("[ManagedQuery] [{}] Done waiting for query", name_)); + + auto retval = query_future_.get(); + if (!retval.succeeded()) { + throw TileDBSOMAError(std::format( + "[ManagedQuery] [{}] Query FAILED: {}", + name_, + retval.message())); + } + + } else { + throw TileDBSOMAError( + std::format("[ManagedQuery] [{}] 'query_future_' invalid", name_)); + } + + auto status = query_->query_status(); + + if (status == Query::Status::FAILED) { + throw TileDBSOMAError( + std::format("[ManagedQuery] [{}] Query FAILED", name_)); + } + + // If the query was ever incomplete, the result buffers contents are not + // complete. + if (status == Query::Status::INCOMPLETE) { + results_complete_ = false; + } else if (status == Query::Status::COMPLETE) { + results_complete_ = true; + } + + // Update ColumnBuffer size to match query results + size_t num_cells = 0; + for (auto& name : buffers_->names()) { + num_cells = buffers_->at(name)->update_size(*query_); + LOG_DEBUG(std::format( + "[ManagedQuery] [{}] Buffer {} cells={}", name_, name, num_cells)); + } + total_num_cells_ += num_cells; + + // TODO: retry the query with larger buffers + if (status == Query::Status::INCOMPLETE && !num_cells) { + throw TileDBSOMAError( + std::format("[ManagedQuery] [{}] Buffers are too small.", name_)); + } + + // Visit all attributes and retrieve enumeration vectors + auto attribute_map = schema_->attributes(); + for (auto& nmit : attribute_map) { + auto attrname = nmit.first; + auto attribute = nmit.second; + auto enumname = AttributeExperimental::get_enumeration_name( + *ctx_, attribute); + if (enumname != std::nullopt) { + auto enumeration = ArrayExperimental::get_enumeration( + *ctx_, *array_, enumname.value()); + auto enumvec = enumeration.as_vector(); + if (!buffers_->contains(attrname)) { + continue; + } + auto colbuf = buffers_->at(attrname); + colbuf->add_enumeration(enumvec); + LOG_DEBUG(std::format( + "[ManagedQuery] got Enumeration '{}' for attribute '{}'", + enumname.value(), + attrname)); + } + } + return buffers_; +} + // Please see the header-file comments for context. void ManagedQuery::_fill_in_subarrays_if_dense(bool is_read) { LOG_TRACE("[ManagedQuery] _fill_in_subarrays enter"); diff --git a/libtiledbsoma/src/soma/managed_query.h b/libtiledbsoma/src/soma/managed_query.h index 146873f19e..b35b9c767f 100644 --- a/libtiledbsoma/src/soma/managed_query.h +++ b/libtiledbsoma/src/soma/managed_query.h @@ -281,6 +281,8 @@ class ManagedQuery { */ void setup_read(); + std::optional> read_next(); + /** * @brief Check if the query is complete. *