Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[c++/python] Expanded enumeration support in ArrowAdapter::to_arrow #1848

Merged
merged 5 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def write(

enmr = self._handle.enum(attr.name)

# get new enumeration values
# get new enumeration values, maintain original ordering
update_vals = []
for new_val in col.chunk(0).dictionary.tolist():
if new_val not in enmr.values():
Expand All @@ -429,7 +429,13 @@ def write(
# only extend if there are new values
if update_vals:
se = tiledb.ArraySchemaEvolution(self.context.tiledb_ctx)
new_enmr = enmr.extend(update_vals)
if np.issubdtype(enmr.dtype.type, np.str_):
extend_vals = np.array(update_vals, "U")
elif np.issubdtype(enmr.dtype.type, np.bytes_):
extend_vals = np.array(update_vals, "S")
else:
extend_vals = np.array(update_vals, enmr.dtype)
new_enmr = enmr.extend(extend_vals)
se.extend_enumeration(new_enmr)
se.array_evolve(uri=self.uri)

Expand Down
54 changes: 22 additions & 32 deletions apis/python/src/tiledbsoma/pytiledbsoma.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,52 +182,42 @@ bool get_enum_is_ordered(SOMAArray& sr, std::string attr_name){
return attr_to_enmrs.at(attr_name).ordered();
}

/**
* @brief Convert ColumnBuffer to Arrow array.
*
* @param column_buffer ColumnBuffer
* @return py::object Arrow array
*/
py::object to_array(std::shared_ptr<ColumnBuffer> column_buffer) {
auto pa = py::module::import("pyarrow");
auto pa_array_import = pa.attr("Array").attr("_import_from_c");

auto [array, schema] = ArrowAdapter::to_arrow(column_buffer);
return pa_array_import(py::capsule(array.get()), py::capsule(schema.get()));
}

/**
* @brief Convert ArrayBuffers to Arrow table.
*
* @param cbs ArrayBuffers
* @return py::object
*/
py::object to_table(SOMAArray& sr, std::shared_ptr<ArrayBuffers> array_buffers) {
py::object _buffer_to_table(std::shared_ptr<ArrayBuffers> buffers) {
auto pa = py::module::import("pyarrow");
auto pa_table_from_arrays = pa.attr("Table").attr("from_arrays");
auto pa_dict_from_arrays = pa.attr("DictionaryArray").attr("from_arrays");
auto pa_array_import = pa.attr("Array").attr("_import_from_c");
auto pa_schema_import = pa.attr("Schema").attr("_import_from_c");

py::list array_list;
py::list names;
py::list arrays;

for (auto& name : array_buffers->names()) {
auto column = array_buffers->at(name);
for (auto& name : buffers->names()) {
auto column = buffers->at(name);
auto [pa_array, pa_schema] = ArrowAdapter::to_arrow(column);
auto array = pa_array_import(py::capsule(pa_array.get()),
py::capsule(pa_schema.get()));
array_list.append(array);
names.append(name);

if(sr.get_attr_to_enum_mapping().count(name) == 0){
arrays.append(to_array(column));
}else{
arrays.append(pa_dict_from_arrays(
to_array(column),
get_enum(sr, name),
py::none(),
get_enum_is_ordered(sr, name)));
}
}

auto pa_table = pa_table_from_arrays(arrays, names);
return pa_table_from_arrays(array_list, names);
}

std::optional<py::object> to_table(
std::optional<std::shared_ptr<ArrayBuffers>> buffers){
// If more data was read, convert it to an arrow table and return
if (buffers.has_value()) {
return _buffer_to_table(*buffers);
}

return pa_table;
// No data was read, the query is complete, return nullopt
return std::nullopt;
}

/**
Expand Down Expand Up @@ -681,7 +671,7 @@ PYBIND11_MODULE(pytiledbsoma, m) {
if (buffers.has_value()) {
// Acquire python GIL before accessing python objects
py::gil_scoped_acquire acquire;
return to_table(reader, *buffers);
return to_table(*buffers);
}

// No data was read, the query is complete, return nullopt
Expand Down
88 changes: 88 additions & 0 deletions apis/python/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,3 +1065,91 @@ def test_timestamped_ops(tmp_path, allows_duplicates, consolidate):
assert list(x.as_py() for x in tab["string"]) == ["apple"]
assert sidf.tiledb_timestamp_ms == 1615402887987
assert sidf.tiledb_timestamp.isoformat() == "2021-03-10T19:01:27.987000+00:00"


def test_extend_enumerations(tmp_path):
pandas_df = pd.DataFrame(
{
"soma_joinid": pd.Series([0, 1, 2, 3, 4, 5], dtype=np.int64),
"str": pd.Series(["A", "B", "A", "B", "B", "B"], dtype="category"),
"byte": pd.Series([b"A", b"B", b"A", b"B", b"B", b"B"], dtype="category"),
"bool": pd.Series(
[True, False, True, False, False, False], dtype="category"
),
"int64": pd.Series(
np.array([0, 1, 2, 0, 1, 2], dtype=np.int64), dtype="category"
),
"uint64": pd.Series(
np.array([0, 1, 2, 0, 1, 2], dtype=np.uint64), dtype="category"
),
"int32": pd.Series(
np.array([0, 1, 2, 0, 1, 2], dtype=np.int32), dtype="category"
),
"uint32": pd.Series(
np.array([0, 1, 2, 0, 1, 2], dtype=np.uint32), dtype="category"
),
"int16": pd.Series(
np.array([0, 1, 2, 0, 1, 2], dtype=np.int16), dtype="category"
),
"uint16": pd.Series(
np.array([0, 1, 2, 0, 1, 2], dtype=np.uint16), dtype="category"
),
"int8": pd.Series(
np.array([0, 1, 2, 0, 1, 2], dtype=np.int8), dtype="category"
),
"uint8": pd.Series(
np.array([0, 1, 2, 0, 1, 2], dtype=np.uint8), dtype="category"
),
"float32": pd.Series(
np.array([0, 1.1, 2.1, 0, 1.1, 2.1], dtype=np.float32), dtype="category"
),
"float64": pd.Series(
np.array([0, 1.1, 2.1, 0, 1.1, 2.1], dtype=np.float64), dtype="category"
),
"float64_w_non_finite": pd.Series(
np.array([0, 1.1, 2.1, 0, np.Inf, np.NINF], dtype=np.float64),
dtype="category",
),
"str_ordered": pd.Series(
pd.Categorical(
["A", "B", "A", "B", "B", "B"],
categories=["B", "A", "C"],
ordered=True,
),
),
"int64_ordered": pd.Series(
pd.Categorical(
[1, 2, 3, 3, 2, 1],
categories=np.array([3, 2, 1], dtype=np.int64),
ordered=True,
),
),
"uint64_ordered": pd.Series(
pd.Categorical(
[1, 2, 3, 3, 2, 1],
categories=np.array([3, 2, 1], dtype=np.uint64),
ordered=True,
),
),
"float64_ordered": pd.Series(
pd.Categorical(
[0, 1.1, 2.1, 0, 1.1, 2.1],
categories=np.array([1.1, 0, 2.1], dtype=np.float64),
ordered=True,
),
),
},
)

schema = pa.Schema.from_pandas(pandas_df, preserve_index=False)

with soma.DataFrame.create(str(tmp_path), schema=schema) as soma_dataframe:
tbl = pa.Table.from_pandas(pandas_df, preserve_index=False)
soma_dataframe.write(tbl)

with soma.open(str(tmp_path)) as soma_dataframe:
df = soma_dataframe.read().concat().to_pandas()
for c in df:
assert df[c].dtype == pandas_df[c].dtype
if df[c].dtype == "category":
assert df[c].cat.categories.dtype == pandas_df[c].cat.categories.dtype
2 changes: 1 addition & 1 deletion apis/r/src/rinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ Rcpp::List soma_array_reader(const std::string& uri,
auto buf = sr_data->get()->at(names[i]);

// this is pair of array and schema pointer
auto pp = tdbs::ArrowAdapter::to_arrow(buf, true);
auto pp = tdbs::ArrowAdapter::to_arrow(buf);

memcpy((void*) chldschemaxp, pp.second.get(), sizeof(ArrowSchema));
memcpy((void*) chldarrayxp, pp.first.get(), sizeof(ArrowArray));
Expand Down
2 changes: 1 addition & 1 deletion apis/r/src/riterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ Rcpp::List sr_next(Rcpp::XPtr<tdbs::SOMAArray> sr) {
auto buf = sr_data->get()->at(names[i]);

// this is pair of array and schema pointer
auto pp = tdbs::ArrowAdapter::to_arrow(buf, true);
auto pp = tdbs::ArrowAdapter::to_arrow(buf);

memcpy((void*) chldschemaxp, pp.second.get(), sizeof(ArrowSchema));
memcpy((void*) chldarrayxp, pp.first.get(), sizeof(ArrowArray));
Expand Down
2 changes: 1 addition & 1 deletion libtiledbsoma/src/soma/column_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class ColumnBuffer {
return is_nullable_;
}

std::optional<Enumeration> get_enumeration() const {
std::optional<Enumeration> get_enumeration_info() const {
return enumeration_;
}

Expand Down
Loading
Loading