Skip to content

Commit

Permalink
[c++] Write data in ArrowArray to SOMAArray (#2425)
Browse files Browse the repository at this point in the history
  • Loading branch information
nguyenv authored Apr 18, 2024
1 parent d4138cc commit 4487347
Show file tree
Hide file tree
Showing 12 changed files with 514 additions and 206 deletions.
2 changes: 1 addition & 1 deletion libtiledbsoma/src/soma/array_buffers.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ using namespace tiledb;
class ArrayBuffers {
public:
ArrayBuffers() = default;
ArrayBuffers(const ArrayBuffers&) = delete;
ArrayBuffers(const ArrayBuffers&) = default;
ArrayBuffers(ArrayBuffers&&) = default;
~ArrayBuffers() = default;

Expand Down
21 changes: 13 additions & 8 deletions libtiledbsoma/src/soma/column_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ std::shared_ptr<ColumnBuffer> ColumnBuffer::create(
}

return ColumnBuffer::alloc(
schema,
schema.context().config(),
name_str,
type,
is_var,
Expand All @@ -91,7 +91,13 @@ std::shared_ptr<ColumnBuffer> ColumnBuffer::create(
}

return ColumnBuffer::alloc(
schema, name_str, type, is_var, false, std::nullopt, false);
schema.context().config(),
name_str,
type,
is_var,
false,
std::nullopt,
false);
}

throw TileDBSOMAError("[ColumnBuffer] Column name not found: " + name_str);
Expand Down Expand Up @@ -211,7 +217,7 @@ std::string_view ColumnBuffer::string_view(uint64_t index) {
//===================================================================

std::shared_ptr<ColumnBuffer> ColumnBuffer::alloc(
ArraySchema schema,
Config config,
std::string_view name,
tiledb_datatype_t type,
bool is_var,
Expand All @@ -221,7 +227,6 @@ std::shared_ptr<ColumnBuffer> ColumnBuffer::alloc(
// Set number of bytes for the data buffer. Override with a value from
// the config if present.
auto num_bytes = DEFAULT_ALLOC_BYTES;
auto config = schema.context().config();
if (config.contains(CONFIG_KEY_INIT_BYTES)) {
auto value_str = config.get(CONFIG_KEY_INIT_BYTES);
try {
Expand All @@ -235,10 +240,10 @@ std::shared_ptr<ColumnBuffer> ColumnBuffer::alloc(
}
}

bool is_dense = schema.array_type() == TILEDB_DENSE;
if (is_dense) {
// TODO: Handle dense arrays similar to tiledb python module
}
// bool is_dense = schema.array_type() == TILEDB_DENSE;
// if (is_dense) {
// // TODO: Handle dense arrays similar to tiledb python module
// }

// For variable length column types, allocate an extra num_bytes to hold
// offset values. The number of cells is the set by the size of the
Expand Down
79 changes: 55 additions & 24 deletions libtiledbsoma/src/soma/column_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
#include <tiledb/tiledb>
#include <tiledb/tiledb_experimental>

#include "../utils/arrow_adapter.h"
#include "../utils/common.h"
#include "soma_context.h"
#include "span/span.hpp"

namespace tiledbsoma {
Expand Down Expand Up @@ -69,28 +71,6 @@ class ColumnBuffer {
static std::shared_ptr<ColumnBuffer> create(
std::shared_ptr<Array> array, std::string_view name);

/**
* @brief Create a ColumnBuffer from a schema, column name, and data.
*
* @param array TileDB array
* @param name TileDB dimension or attribute name
* @param data Data to set in buffer
* @return ColumnBuffer
*/
template <typename T>
static std::shared_ptr<ColumnBuffer> create(
std::shared_ptr<Array> array,
std::string_view name,
std::vector<T> data) {
auto column_buff = ColumnBuffer::create(array, name);
column_buff->num_cells_ = data.size();
column_buff->data_.resize(data.size());
column_buff->data_.assign(
reinterpret_cast<std::byte*>(data.data()),
reinterpret_cast<std::byte*>(data.data() + data.size()));
return column_buff;
}

/**
* @brief Convert a bytemap to a bitmap in place.
*
Expand Down Expand Up @@ -136,6 +116,45 @@ class ColumnBuffer {
*/
void attach(Query& query);

/**
* @brief Set the ColumnBuffer's data.
*
* @param data pointer to the beginning of the data to write
* @param num_elems the number of elements in the column
*/
void set_data(
uint64_t num_elems,
const void* data,
uint64_t* offsets = nullptr,
uint8_t* validity = nullptr) {
num_cells_ = num_elems;

if (offsets != nullptr) {
auto num_offsets = num_elems + 1;
offsets_.resize(num_offsets);
offsets_.assign(
(uint64_t*)offsets, (uint64_t*)offsets + num_offsets);

data_size_ = offsets_[num_offsets - 1];
data_.resize(data_size_);
data_.assign((std::byte*)data, (std::byte*)data + data_size_);
} else {
data_size_ = num_elems;
data_.resize(num_elems);
data_.assign(
(std::byte*)data, (std::byte*)data + num_elems * type_size_);
}

if (is_nullable_) {
if (validity != nullptr) {
validity_.assign(validity, validity + num_elems);
} else {
validity_.resize(num_elems);
std::fill(validity_.begin(), validity_.end(), 1);
}
}
}

/**
* @brief Size num_cells_ to match the read query results.
*
Expand All @@ -152,6 +171,15 @@ class ColumnBuffer {
return num_cells_;
}

/**
* @brief Return size of the data buffer.
*
* @return uint64_t
*/
uint64_t data_size() {
return data_size_;
}

/**
* @brief Return a view of the ColumnBuffer data.
*
Expand Down Expand Up @@ -342,7 +370,7 @@ class ColumnBuffer {
/**
* @brief Allocate and return a ColumnBuffer.
*
* @param array TileDB array
* @param config TileDB Config
* @param name Column name
* @param type TileDB datatype
* @param is_var True if variable length data
Expand All @@ -352,7 +380,7 @@ class ColumnBuffer {
* @return ColumnBuffer
*/
static std::shared_ptr<ColumnBuffer> alloc(
ArraySchema schema,
Config config,
std::string_view name,
tiledb_datatype_t type,
bool is_var,
Expand All @@ -370,6 +398,9 @@ class ColumnBuffer {
// Data type of the column from the schema.
tiledb_datatype_t type_;

// Data size which is calculated different for var vs non-var
uint64_t data_size_;

// Bytes per element.
uint64_t type_size_;

Expand Down
126 changes: 124 additions & 2 deletions libtiledbsoma/src/soma/managed_query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,128 @@ void ManagedQuery::select_columns(
}
}

void ManagedQuery::set_column_data(
std::shared_ptr<ColumnBuffer> column_buffer) {
auto column_name = std::string(column_buffer->name());
bool has_attr = schema_->has_attribute(column_name);
bool is_sparse = array_->schema().array_type() == TILEDB_SPARSE;

if (is_sparse) {
auto data = column_buffer->data<std::byte>();
query_->set_data_buffer(
column_name, (void*)data.data(), column_buffer->data_size());
if (column_buffer->is_var()) {
auto offsets = column_buffer->offsets();
query_->set_offsets_buffer(
column_name, offsets.data(), offsets.size());
}
if (column_buffer->is_nullable()) {
auto validity = column_buffer->validity();
query_->set_validity_buffer(
column_name, validity.data(), validity.size());
}
} else {
if (has_attr) {
auto data = column_buffer->data<std::byte>();
query_->set_data_buffer(
column_name, (void*)data.data(), column_buffer->data_size());
if (column_buffer->is_var()) {
auto offsets = column_buffer->offsets();
query_->set_offsets_buffer(
column_name, offsets.data(), offsets.size());
}
if (column_buffer->is_nullable()) {
auto validity = column_buffer->validity();
query_->set_validity_buffer(
column_name, validity.data(), validity.size());
}
} else {
switch (column_buffer->type()) {
case TILEDB_STRING_ASCII:
case TILEDB_STRING_UTF8:
case TILEDB_CHAR:
case TILEDB_BLOB:
subarray_->add_range(
column_name,
column_buffer->data<std::string>()[0],
column_buffer->data<std::string>()[1]);
break;
case TILEDB_FLOAT32:
subarray_->add_range(
column_name,
column_buffer->data<float>()[0],
column_buffer->data<float>()[1]);
break;
case TILEDB_FLOAT64:
subarray_->add_range(
column_name,
column_buffer->data<double>()[0],
column_buffer->data<double>()[1]);
break;
case TILEDB_UINT8:
subarray_->add_range(
column_name,
column_buffer->data<uint8_t>()[0],
column_buffer->data<uint8_t>()[1]);
break;
case TILEDB_INT8:
subarray_->add_range(
column_name,
column_buffer->data<int8_t>()[0],
column_buffer->data<int8_t>()[1]);
break;
case TILEDB_UINT16:
subarray_->add_range(
column_name,
column_buffer->data<uint16_t>()[0],
column_buffer->data<uint16_t>()[1]);
break;
case TILEDB_INT16:
subarray_->add_range(
column_name,
column_buffer->data<int16_t>()[0],
column_buffer->data<int16_t>()[1]);
break;
case TILEDB_UINT32:
subarray_->add_range(
column_name,
column_buffer->data<uint32_t>()[0],
column_buffer->data<uint32_t>()[1]);
break;
case TILEDB_INT32:
subarray_->add_range(
column_name,
column_buffer->data<int32_t>()[0],
column_buffer->data<int32_t>()[1]);
break;
case TILEDB_UINT64:
subarray_->add_range(
column_name,
column_buffer->data<uint64_t>()[0],
column_buffer->data<uint64_t>()[1]);
break;
case TILEDB_INT64:
case TILEDB_TIME_SEC:
case TILEDB_TIME_MS:
case TILEDB_TIME_US:
case TILEDB_TIME_NS:
case TILEDB_DATETIME_SEC:
case TILEDB_DATETIME_MS:
case TILEDB_DATETIME_US:
case TILEDB_DATETIME_NS:
subarray_->add_range(
column_name,
column_buffer->data<int64_t>()[0],
column_buffer->data<int64_t>()[1]);
break;
default:
break;
}
query_->set_subarray(*subarray_);
}
}
}

void ManagedQuery::setup_read() {
// If the query is complete, return so we do not submit it again
auto status = query_->query_status();
Expand Down Expand Up @@ -150,6 +272,7 @@ void ManagedQuery::setup_read() {

void ManagedQuery::submit_write() {
query_->submit();
query_->finalize();
}

void ManagedQuery::submit_read() {
Expand All @@ -168,7 +291,7 @@ std::shared_ptr<ArrayBuffers> ManagedQuery::results() {

if (query_future_.valid()) {
LOG_DEBUG(fmt::format("[ManagedQuery] [{}] Waiting for query", name_));
query_future_.get();
query_future_.wait();
} else {
throw TileDBSOMAError(
fmt::format("[ManagedQuery] [{}] 'query_future_' invalid", name_));
Expand Down Expand Up @@ -237,5 +360,4 @@ void ManagedQuery::check_column_name(const std::string& name) {
name));
}
}

}; // namespace tiledbsoma
Loading

0 comments on commit 4487347

Please sign in to comment.