Skip to content

Commit

Permalink
WIP try implementing ManagedQuery::read_next again
Browse files Browse the repository at this point in the history
* This passes locally but have had issues on CI in past attempts
  • Loading branch information
nguyenv committed Dec 9, 2024
1 parent e08d648 commit 4b7f6bc
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 17 deletions.
55 changes: 38 additions & 17 deletions apis/python/src/tiledbsoma/managed_query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,34 +136,55 @@ void load_managed_query(py::module& m) {
}
})

// .def(
// "next",
// [](ManagedQuery& mq) -> std::optional<py::object> {
// // Release python GIL before reading data
// py::gil_scoped_release release;

// mq.setup_read();

// if (mq.is_empty_query() && mq.is_first_read()) {
// auto tbl = mq.results();
// // Acquire python GIL before accessing python objects
// py::gil_scoped_acquire acquire;
// return to_table(std::make_optional(tbl));
// }

// if (mq.is_complete(false)) {
// throw py::stop_iteration();
// }

// try {
// mq.submit_read();
// auto tbl = mq.results();
// // Acquire python GIL before accessing python objects
// py::gil_scoped_acquire acquire;
// return to_table(std::make_optional(tbl));
// } catch (const std::exception& e) {
// throw TileDBSOMAError(e.what());
// }
// })

.def(
"next",
[](ManagedQuery& mq) -> std::optional<py::object> {
// Release python GIL before reading data
py::gil_scoped_release release;

mq.setup_read();

if (mq.is_empty_query() && mq.is_first_read()) {
auto tbl = mq.results();
std::optional<std::shared_ptr<ArrayBuffers>> tbl;
try {
tbl = mq.read_next();
// Acquire python GIL before accessing python objects
py::gil_scoped_acquire acquire;
return to_table(std::make_optional(tbl));
} catch (const std::exception& e) {
throw TileDBSOMAError(e.what());
}
py::gil_scoped_acquire acquire;

if (mq.is_complete(false)) {
if (!tbl) {
throw py::stop_iteration();
}

try {
mq.submit_read();
auto tbl = mq.results();
// Acquire python GIL before accessing python objects
py::gil_scoped_acquire acquire;
return to_table(std::make_optional(tbl));
} catch (const std::exception& e) {
throw TileDBSOMAError(e.what());
}
return to_table(tbl);
})

.def(
Expand Down
98 changes: 98 additions & 0 deletions libtiledbsoma/src/soma/managed_query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,104 @@ void ManagedQuery::submit_read() {
});
}

std::optional<std::shared_ptr<ArrayBuffers>> ManagedQuery::read_next() {
setup_read();

if (is_empty_query() && !query_submitted_) {
query_submitted_ = true;
return buffers_;
}

if (is_complete(false)) {
return std::nullopt;
}

query_submitted_ = true;
query_future_ = std::async(std::launch::async, [&]() {
LOG_DEBUG("[ManagedQuery] submit thread start");
try {
query_->submit();
} catch (const std::exception& e) {
return StatusAndException(false, e.what());
}
LOG_DEBUG("[ManagedQuery] submit thread done");
return StatusAndException(true, "success");
});

if (query_future_.valid()) {
LOG_DEBUG(std::format("[ManagedQuery] [{}] Waiting for query", name_));
query_future_.wait();
LOG_DEBUG(
std::format("[ManagedQuery] [{}] Done waiting for query", name_));

auto retval = query_future_.get();
if (!retval.succeeded()) {
throw TileDBSOMAError(std::format(
"[ManagedQuery] [{}] Query FAILED: {}",
name_,
retval.message()));
}

} else {
throw TileDBSOMAError(
std::format("[ManagedQuery] [{}] 'query_future_' invalid", name_));
}

auto status = query_->query_status();

if (status == Query::Status::FAILED) {
throw TileDBSOMAError(
std::format("[ManagedQuery] [{}] Query FAILED", name_));
}

// If the query was ever incomplete, the result buffers contents are not
// complete.
if (status == Query::Status::INCOMPLETE) {
results_complete_ = false;
} else if (status == Query::Status::COMPLETE) {
results_complete_ = true;
}

// Update ColumnBuffer size to match query results
size_t num_cells = 0;
for (auto& name : buffers_->names()) {
num_cells = buffers_->at(name)->update_size(*query_);
LOG_DEBUG(std::format(
"[ManagedQuery] [{}] Buffer {} cells={}", name_, name, num_cells));
}
total_num_cells_ += num_cells;

// TODO: retry the query with larger buffers
if (status == Query::Status::INCOMPLETE && !num_cells) {
throw TileDBSOMAError(
std::format("[ManagedQuery] [{}] Buffers are too small.", name_));
}

// Visit all attributes and retrieve enumeration vectors
auto attribute_map = schema_->attributes();
for (auto& nmit : attribute_map) {
auto attrname = nmit.first;
auto attribute = nmit.second;
auto enumname = AttributeExperimental::get_enumeration_name(
*ctx_, attribute);
if (enumname != std::nullopt) {
auto enumeration = ArrayExperimental::get_enumeration(
*ctx_, *array_, enumname.value());
auto enumvec = enumeration.as_vector<std::string>();
if (!buffers_->contains(attrname)) {
continue;
}
auto colbuf = buffers_->at(attrname);
colbuf->add_enumeration(enumvec);
LOG_DEBUG(std::format(
"[ManagedQuery] got Enumeration '{}' for attribute '{}'",
enumname.value(),
attrname));
}
}
return buffers_;
}

// Please see the header-file comments for context.
void ManagedQuery::_fill_in_subarrays_if_dense(bool is_read) {
LOG_TRACE("[ManagedQuery] _fill_in_subarrays enter");
Expand Down
2 changes: 2 additions & 0 deletions libtiledbsoma/src/soma/managed_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ class ManagedQuery {
*/
void setup_read();

std::optional<std::shared_ptr<ArrayBuffers>> read_next();

/**
* @brief Check if the query is complete.
*
Expand Down

0 comments on commit 4b7f6bc

Please sign in to comment.