Skip to content

Commit

Permalink
Add Methods to DataFrameWrapper and ArrayWrapper
Browse files Browse the repository at this point in the history
* Take care of formatting / typing
* Correct datetime domains
* Get full nonempty domains for `SOMADataFrame`
* Find missing open that needs to use `DataframeWrapper`
  • Loading branch information
nguyenv committed Oct 17, 2023
1 parent 1844b0e commit 7ee0b85
Show file tree
Hide file tree
Showing 10 changed files with 1,345 additions and 70 deletions.
8 changes: 5 additions & 3 deletions apis/python/src/tiledbsoma/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
from typing_extensions import Self

from . import _funcs, _tdb_handles
from . import pytiledbsoma as clib
from ._common_nd_array import NDArray
from ._dataframe import DataFrame
from ._dense_nd_array import DenseNDArray
from ._exception import SOMAError, is_does_not_exist_error
from ._sparse_nd_array import SparseNDArray
from ._tdb_handles import DataFrameWrapper
from ._tiledb_object import AnyTileDBObject, TileDBObject
from ._types import OpenTimestamp
from ._util import (
Expand All @@ -47,8 +49,6 @@
)
from .options import SOMATileDBContext
from .options._soma_tiledb_context import _validate_soma_tiledb_context
from ._tdb_handles import DataFrameWrapper
from . import pytiledbsoma as clib

# A collection can hold any sub-type of TileDBObject
CollectionElementType = TypeVar("CollectionElementType", bound=AnyTileDBObject)
Expand Down Expand Up @@ -427,7 +427,9 @@ def __getitem__(self, key: str) -> CollectionElementType:
raise KeyError(err_str) from None
if entry.soma is None:
from . import _factory # Delayed binding to resolve circular import.

from ._tdb_handles import Wrapper

wrapper: type[Wrapper[Any | Any | Any]]
if self.mode == "r" and clib.SOMADataFrame.exists(entry.entry.uri):
wrapper = DataFrameWrapper
else:
Expand Down
34 changes: 24 additions & 10 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,22 +354,22 @@ def read(
_util.check_unpartitioned(partitions)
self._check_open_read()

schema = self._handle.schema
query_condition = None
if value_filter is not None:
query_condition = QueryCondition(value_filter)
order = {
QueryCondition(value_filter)

to_clib_result_order = {
options.ResultOrder.AUTO: clib.ResultOrder.automatic,
options.ResultOrder.ROW_MAJOR: clib.ResultOrder.rowmajor,
options.ResultOrder.COLUMN_MAJOR: clib.ResultOrder.colmajor,
"auto": clib.ResultOrder.automatic,
"row-major": clib.ResultOrder.rowmajor,
"col-major": clib.ResultOrder.colmajor
"column-major": clib.ResultOrder.colmajor,
}
if result_order not in to_clib_result_order:
raise ValueError(f"Invalid result_order: {result_order}")

sr = self._handle._handle
sr.reset(column_names or [], "auto", order[result_order])
sr.reset(column_names or [], "auto", to_clib_result_order[result_order])

self._set_reader_coords(sr, coords)

Expand Down Expand Up @@ -505,7 +505,10 @@ def _set_reader_coord(
# Note: slice(None, None) matches the is_slice_of part, unless we also check the dim-type
# part.
if (is_slice_of(coord, str) or is_slice_of(coord, bytes)) and (
pa.types.is_large_string(dim.type) or pa.types.is_large_binary(dim.type) or pa.types.is_string(dim.type) or pa.types.is_binary(dim.type)
pa.types.is_large_string(dim.type)
or pa.types.is_large_binary(dim.type)
or pa.types.is_string(dim.type)
or pa.types.is_binary(dim.type)
):
_util.validate_slice(coord)
# Figure out which one.
Expand All @@ -515,7 +518,13 @@ def _set_reader_coord(
if coord.stop is None:
# There's no way to specify "to infinity" for strings.
# We have to get the nonempty domain and use that as the end.
_, stop = self._handle.nonempty_domain(dim.name)
ned = self._handle.nonempty_domain()
if ned is None:
raise ValueError(
"Found empty nonempty domain when setting "
"string coordinates in _set_reader_coord"
)
_, stop = ned[dim_idx]
else:
stop = coord.stop
sr.set_dim_ranges_string_or_bytes(dim.name, [(start, stop)])
Expand Down Expand Up @@ -576,7 +585,12 @@ def _set_reader_coord_by_py_seq_or_np_array(
sr.set_dim_points_float64(dim.name, coord)
elif pa.types.is_float32(dim.type):
sr.set_dim_points_float32(dim.name, coord)
elif pa.types.is_large_string(dim.type) or pa.types.is_large_binary(dim.type) or pa.types.is_string(dim.type) or pa.types.is_binary(dim.type):
elif (
pa.types.is_large_string(dim.type)
or pa.types.is_large_binary(dim.type)
or pa.types.is_string(dim.type)
or pa.types.is_binary(dim.type)
):
sr.set_dim_points_string_or_bytes(dim.name, coord)
elif pa.types.is_timestamp(dim.type):
if not isinstance(coord, (tuple, list, np.ndarray)):
Expand Down
93 changes: 72 additions & 21 deletions apis/python/src/tiledbsoma/_tdb_handles.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,24 @@
Mapping,
MutableMapping,
Optional,
Tuple,
Type,
TypeVar,
Union,
)

import attrs
import pyarrow as pa
import tiledb
from somacore import options
from typing_extensions import Literal, Self
import numpy as np
import pyarrow as pa

from . import pytiledbsoma as clib
from ._exception import DoesNotExistError, SOMAError, is_does_not_exist_error
from ._types import OpenTimestamp
from .options._soma_tiledb_context import SOMATileDBContext

from . import pytiledbsoma as clib

RawHandle = Union[tiledb.Array, tiledb.Group]
RawHandle = Union[tiledb.Array, tiledb.Group, clib.SOMADataFrame]
_RawHdl_co = TypeVar("_RawHdl_co", bound=RawHandle, covariant=True)
"""A raw TileDB object. Covariant because Handles are immutable enough."""

Expand Down Expand Up @@ -200,6 +199,32 @@ def _opener(
def schema(self) -> tiledb.ArraySchema:
return self._handle.schema

@property
def domain(self) -> Tuple[Tuple[Any, Any], ...]:
dom = self._handle.schema.domain
return tuple(dom.dim(i).domain for i in range(dom.ndim))

@property
def ndim(self) -> int:
return int(self._handle.schema.domain.ndim)

def nonempty_domain(self) -> Optional[Tuple[Tuple[Any, Any], ...]]:
try:
ned: Optional[Tuple[Tuple[Any, Any], ...]] = self._handle.nonempty_domain()
return ned
except tiledb.TileDBError as e:
raise SOMAError(e)

@property
def attr_names(self) -> Tuple[str, ...]:
schema = self._handle.schema
return tuple(schema.attr(i).name for i in range(schema.nattr))

@property
def dim_names(self) -> Tuple[str, ...]:
schema = self._handle.schema
return tuple(schema.domain.dim(i).name for i in range(schema.domain.ndim))


@attrs.define(frozen=True)
class GroupEntry:
Expand Down Expand Up @@ -237,7 +262,8 @@ def _do_initial_reads(self, reader: tiledb.Group) -> None:
self.initial_contents = {
o.name: GroupEntry.from_object(o) for o in reader if o.name is not None
}



class DataFrameWrapper(Wrapper[clib.SOMADataFrame]):
@classmethod
def _opener(
Expand All @@ -258,38 +284,63 @@ def _opener(
)

@property
def schema(self) -> tiledb.ArraySchema:
def schema(self) -> pa.Schema:
return self._handle.schema

@property
def meta(self):
return self._handle.meta
def meta(self) -> Dict[str, str]:
return dict(self._handle.meta)

@property
def domain(self):
def domain(self) -> Tuple[Tuple[Any, Any], ...]:
result = []
for name in self._handle.index_column_names:
dtype = self._handle.schema.field(name).type
if pa.types.is_timestamp(dtype):
dom = self._handle.domain(name)
np_dtype = dtype.to_pandas_dtype()
tz = np.datetime_data(np_dtype)[0]
result.append(
(np_dtype.type(dom[0], tz), np_dtype.type(dom[1], tz)))
(
np_dtype.type(dom[0], dtype.unit),
np_dtype.type(dom[1], dtype.unit),
)
)
else:
result.append(self._handle.domain(name))
return tuple(result)

@property
def ndim(self):
return self._handle.ndim

def ndim(self) -> int:
return int(self._handle.ndim)

def nonempty_domain(self) -> Optional[Tuple[Tuple[Any, Any], ...]]:
result = []
for name in self._handle.index_column_names:
dtype = self._handle.schema.field(name).type
if pa.types.is_timestamp(dtype):
ned = self._handle.nonempty_domain(name)
np_dtype = dtype.to_pandas_dtype()
result.append(
(
np_dtype.type(ned[0], dtype.unit),
np_dtype.type(ned[1], dtype.unit),
)
)
else:
result.append(self._handle.domain(name))
return None if len(result) == 0 else tuple(result)

@property
def attr_names(self) -> Tuple[str, ...]:
result = []
for field in self.schema:
if field.name not in self._handle.index_column_names:
result.append(field.name)
return tuple(result)

@property
def index_column_names(self):
def dim_names(self) -> Tuple[str, ...]:
return tuple(self._handle.index_column_names)

def nonempty_domain(self, name: str):
return self._handle.nonempty_domain(name)


class _DictMod(enum.Enum):
Expand Down
43 changes: 12 additions & 31 deletions apis/python/src/tiledbsoma/_tiledb_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def schema(self) -> pa.Schema:
Experimental.
"""
if isinstance(self._tiledb_array_schema(), tiledb.ArraySchema):
return tiledb_schema_to_arrow(self._tiledb_array_schema(), self.uri, self._ctx)
return tiledb_schema_to_arrow(
self._tiledb_array_schema(), self.uri, self._ctx
)
else:
return self._tiledb_array_schema()

Expand All @@ -78,32 +80,16 @@ def _tiledb_array_keys(self) -> Tuple[str, ...]:

def _tiledb_dim_names(self) -> Tuple[str, ...]:
"""Reads the dimension names from the schema: for example, ['obs_id', 'var_id']."""
schema = self._handle.schema
if isinstance(schema, tiledb.ArraySchema):
return tuple(schema.domain.dim(i).name for i in range(schema.domain.ndim))
else:
return tuple(self._handle.index_column_names)
return self._handle.dim_names

def _tiledb_attr_names(self) -> Tuple[str, ...]:
"""Reads the attribute names from the schema:
for example, the list of column names in a dataframe.
"""
schema = self._handle.schema
if isinstance(schema, tiledb.ArraySchema):
return tuple(schema.attr(i).name for i in range(schema.nattr))
else:
result = []
for field in schema:
if field.name not in self._handle.index_column_names:
result.append(field.name)
return tuple(result)
return self._handle.attr_names

def _tiledb_domain(self) -> Tuple[Tuple[Any, Any], ...]:
schema = self._handle.schema
if isinstance(schema, tiledb.ArraySchema):
return tuple(schema.domain.dim(i).domain for i in range(0, schema.domain.ndim))
else:
return self._handle.domain
return self._handle.domain

def _soma_reader(
self,
Expand Down Expand Up @@ -146,24 +132,20 @@ def _set_reader_coords(self, sr: clib.SOMAArray, coords: Sequence[object]) -> No
f"coords type {type(coords)} must be a regular sequence,"
" not str or bytes"
)

schema = self._handle.schema
if isinstance(schema, tiledb.ArraySchema):
ndim = schema.domain.ndim
else:
ndim = self._handle.ndim

if len(coords) > ndim:

if len(coords) > self._handle.ndim:
raise ValueError(
f"coords ({len(coords)} elements) must be shorter than ndim"
f" ({ndim})"
f" ({self._handle.ndim})"
)
for i, coord in enumerate(coords):
schema = self._handle.schema

if isinstance(schema, tiledb.ArraySchema):
dim = self._handle.schema.domain.dim(i)
else:
dim = self._handle.schema.field(i)

if not self._set_reader_coord(sr, i, dim, coord):
raise TypeError(
f"coord type {type(coord)} for dimension {dim.name}"
Expand All @@ -182,7 +164,6 @@ def _set_reader_coord(
Returns:
True if successful, False if unrecognized.
"""
del dim_idx # Unused.
if coord is None:
return True # No constraint; select all in this dimension

Expand Down
4 changes: 2 additions & 2 deletions apis/python/src/tiledbsoma/_tiledb_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
from typing_extensions import Self

from . import _constants, _tdb_handles
from . import pytiledbsoma as clib
from ._exception import SOMAError
from ._tdb_handles import DataFrameWrapper
from ._types import OpenTimestamp
from ._util import check_type, ms_to_datetime
from .options import SOMATileDBContext
from .options._soma_tiledb_context import _validate_soma_tiledb_context
from ._tdb_handles import DataFrameWrapper
from . import pytiledbsoma as clib

_WrapperType_co = TypeVar(
"_WrapperType_co", bound=_tdb_handles.AnyWrapper, covariant=True
Expand Down
4 changes: 2 additions & 2 deletions apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1750,15 +1750,15 @@ def _write_matrix_to_denseNDArray(

def _read_nonempty_domain(arr: TileDBArray) -> Any:
try:
return arr._handle.reader.nonempty_domain()
return arr._handle.nonempty_domain()
except SOMAError:
# This means that we're open in write-only mode.
# Reopen the array in read mode.
pass

cls = type(arr)
with cls.open(arr.uri, "r", platform_config=None, context=arr.context) as readarr:
return readarr._handle.reader.nonempty_domain()
return readarr._handle.nonempty_domain()


def _find_sparse_chunk_size(
Expand Down
Loading

0 comments on commit 7ee0b85

Please sign in to comment.