Skip to content

Commit

Permalink
apply SOMADataFrame mods to SOMAIndexedDataFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
johnkerl committed Oct 19, 2022
1 parent 4e89837 commit 1ba6bf5
Showing 1 changed file with 45 additions and 40 deletions.
85 changes: 45 additions & 40 deletions apis/python/src/tiledbsoma/soma_indexed_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

from . import util_arrow, util_tiledb
from .constants import SOMA_JOINID

# This package's pybind11 code
import tiledbsoma.libtiledbsoma as clib

from . import query_condition as qcmodule
from .soma_collection import SOMACollectionBase
from .tiledb_array import TileDBArray
from .types import Ids, SOMAResultOrder
Expand Down Expand Up @@ -159,64 +164,63 @@ def get_index_column_names(self) -> Sequence[str]:
def read(
self,
*,
# TODO: find out how to spell this in a way the type-checker will accept :(
# ids: Optional[Union[Sequence[int], str, Slice]] = None,
# TODO: find the right syntax to get the typechecker to accept args like ``ids=slice(0,10)``
# ids: Optional[Union[Sequence[int], Slice]] = None,
ids: Optional[Any] = None,
value_filter: Optional[str] = None,
column_names: Optional[Sequence[str]] = None,
result_order: Optional[SOMAResultOrder] = None,
# TODO: more arguments
# TODO: batch_size
# TODO: partition,
# TODO: platform_config,
) -> Iterator[pa.Table]:
"""
Read a user-defined subset of data, addressed by the dataframe indexing columns, optionally filtered, and return results as one or more Arrow.Table.
Read a user-defined subset of data, addressed by the dataframe indexing column, optionally filtered, and return results as one or more ``Arrow.Table``.
:param ids: for each index dimension, which rows to read. Defaults to ``None``, meaning no constraint -- all IDs.
:param ids: Which rows to read. Defaults to ``None``, meaning no constraint -- all rows.
:param column_names: the named columns to read and return. Defaults to ``None``, meaning no constraint -- all column names.
:param partitions: an optional ``SOMAReadPartitions`` hint to indicate how results should be organized.
:param result_order: order of read results. This can be one of 'row-major', 'col-major', or 'unordered'.
:param result_order: order of read results. This can be one of 'row-major', 'col-major', or 'unordered'.
:param value_filter: an optional [value filter] to apply to the results. Defaults to no filter.
**Indexing**: the ``ids`` parameter will support, per dimension: a list of values of the type of the indexed column.
**Indexing**: the ``ids`` parameter will support, per dimension: a row offset (uint), a row-offset range (slice), or a list of both.
"""
tiledb_result_order = util_tiledb.tiledb_result_order_from_soma_result_order(
result_order, accept=["row-major", "column-major", "unordered"]
)

# TODO: more about index_column_names
with self._tiledb_open("r") as A:
dim_names, attr_names = util_tiledb.split_column_names(
A.schema, column_names
query_condition = None
if value_filter is not None:
query_condition = qcmodule.QueryCondition(value_filter)

# As an arg to this method, `column_names` is optional-None. For the pybind11
# code it's optional-[].
lib_column_names = [] if column_names is None else column_names

sr = clib.SOMAReader(
self._uri,
name=self.__class__.__name__,
schema=A.schema, # query_condition needs this
column_names=lib_column_names,
query_condition=query_condition,
)
if value_filter is None:
query = A.query(
return_arrow=True,
return_incomplete=True,
order=tiledb_result_order,
dims=dim_names,
attrs=attr_names,
)
else:
qc = tiledb.QueryCondition(value_filter)
query = A.query(
return_arrow=True,
return_incomplete=True,
attr_cond=qc,
order=tiledb_result_order,
dims=dim_names,
attrs=attr_names,
)

if ids is None:
iterator = query.df[:]
else:
iterator = query.df[ids]

for table in iterator:
yield table
if ids is not None:
# XXX TODO NEEDS TO ALWAYS BE A LIST NO MATTER WHAT
if isinstance(ids, slice):
ids = util.slice_to_list(ids)
sr.set_dim_points(A.schema.domain.dim(0).name, ids)

# TODO: platform_config
# TODO: batch_size
# TODO: result_order

sr.submit()

while arrow_table := sr.read_next():
# yield util_arrow.ascii_to_unicode_pyarrow_readback(batch)
yield arrow_table # XXX what other post-processing

def read_all(
self,
Expand All @@ -229,10 +233,11 @@ def read_all(
result_order: Optional[SOMAResultOrder] = None,
# TODO: batch_size
# TODO: partition,
# TODO: result_order,
# TODO: platform_config,
) -> pa.Table:
"""
This is a convenience method around ``read``. It iterates the return value from ``read`` and returns a concatenation of all the table-pieces found. Its nominal use is to simply unit-test cases.
This is a convenience method around ``read``. It concatenates all partial read results into a single Table. Its nominal use is to simplify unit-test cases.
"""
return pa.concat_tables(
self.read(
Expand Down

0 comments on commit 1ba6bf5

Please sign in to comment.