Skip to content

Commit

Permalink
[c++,python] Remove tiledb-py import for all arrays
Browse files Browse the repository at this point in the history
* Add vacuuming and consolidation to C++
* Remove TileDB ArraySchema creation
* Rename TileDBArray to SOMAArray to reflect that arrays are no
  longer TileDB-Py arrays but clib.SOMAArray
  • Loading branch information
nguyenv committed May 6, 2024
1 parent 5a442af commit 05d4440
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 251 deletions.
109 changes: 4 additions & 105 deletions apis/python/src/tiledbsoma/_common_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,20 @@

from typing import Optional, Sequence, Tuple, Union, cast

import numpy as np
import pyarrow as pa
import somacore
from somacore import options
from typing_extensions import Self

import tiledb

from . import _arrow_types, _util
from ._exception import (
AlreadyExistsError,
NotCreateableError,
is_already_exists_error,
is_not_createable_error,
)
from ._tiledb_array import TileDBArray
from ._soma_array import SOMAArray
from ._types import OpenTimestamp
from .options._soma_tiledb_context import (
SOMATileDBContext,
_validate_soma_tiledb_context,
)
from .options._tiledb_create_options import TileDBCreateOptions


class NDArray(TileDBArray, somacore.NDArray):
class NDArray(SOMAArray, somacore.NDArray):
"""Abstract base for the common behaviors of both kinds of NDArray."""

__slots__ = ()
Expand Down Expand Up @@ -93,26 +82,7 @@ def create(
Lifecycle:
Experimental.
"""
context = _validate_soma_tiledb_context(context)
schema = cls._build_tiledb_schema(
type,
shape,
TileDBCreateOptions.from_platform_config(platform_config),
context,
is_sparse=cls.is_sparse,
)
try:
handle = cls._create_internal(uri, schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
if is_not_createable_error(tdbe):
raise NotCreateableError(f"{uri!r} cannot be created")
raise
raise NotImplementedError("must be implemented by child class.")

Check warning on line 85 in apis/python/src/tiledbsoma/_common_nd_array.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/_common_nd_array.py#L85

Added line #L85 was not covered by tests

@property
def shape(self) -> Tuple[int, ...]:
Expand All @@ -123,7 +93,7 @@ def shape(self) -> Tuple[int, ...]:
Lifecycle:
Experimental.
"""
return cast(Tuple[int, ...], tuple(self._soma_reader().shape))
return cast(Tuple[int, ...], tuple(self._handle.shape))

def reshape(self, shape: Tuple[int, ...]) -> None:
"""Unsupported operation for this object type.
Expand All @@ -133,77 +103,6 @@ def reshape(self, shape: Tuple[int, ...]) -> None:
"""
raise NotImplementedError("reshape operation not implemented.")

@classmethod
def _build_tiledb_schema(
cls,
type: pa.DataType,
shape: Sequence[Union[int, None]],
create_options: TileDBCreateOptions,
context: SOMATileDBContext,
*,
is_sparse: bool,
) -> tiledb.ArraySchema:
_util.check_type("type", type, (pa.DataType,))

if not pa.types.is_primitive(type):
raise TypeError(
f"Unsupported type {type} --"
" SOMA NDArrays only support primtive Arrow types"
)

if not shape:
raise ValueError("SOMA NDArrays must have a nonzero number of dimensions")

dims = []
for dim_idx, dim_shape in enumerate(shape):
dim_name = f"soma_dim_{dim_idx}"
dim_capacity, dim_extent = cls._dim_capacity_and_extent(
dim_name, dim_shape, create_options
)
dim = tiledb.Dim(
name=dim_name,
domain=(0, dim_capacity - 1),
tile=dim_extent,
dtype=np.int64,
filters=create_options.dim_filters_tiledb(
dim_name,
[
dict(
_type="ZstdFilter",
level=create_options.sparse_nd_array_dim_zstd_level,
)
],
),
)
dims.append(dim)
dom = tiledb.Domain(dims, ctx=context.tiledb_ctx)

attrs = [
tiledb.Attr(
name="soma_data",
dtype=_arrow_types.tiledb_type_from_arrow_type(type),
filters=create_options.attr_filters_tiledb("soma_data", ["ZstdFilter"]),
ctx=context.tiledb_ctx,
)
]

cell_order, tile_order = create_options.cell_tile_orders()

# TODO: accept more TileDB array-schema options from create_options
# https://github.com/single-cell-data/TileDB-SOMA/issues/876
return tiledb.ArraySchema(
domain=dom,
attrs=attrs,
sparse=is_sparse,
allows_duplicates=create_options.allows_duplicates,
offsets_filters=create_options.offsets_filters_tiledb(),
validity_filters=create_options.validity_filters_tiledb(),
capacity=create_options.capacity,
tile_order=tile_order,
cell_order=cell_order,
ctx=context.tiledb_ctx,
)

@classmethod
def _dim_capacity_and_extent(
cls,
Expand Down
6 changes: 3 additions & 3 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from ._exception import SOMAError, map_exception_for_create
from ._query_condition import QueryCondition
from ._read_iters import TableReadIter
from ._soma_array import SOMAArray
from ._tdb_handles import DataFrameWrapper
from ._tiledb_array import TileDBArray
from ._types import NPFloating, NPInteger, OpenTimestamp, Slice, is_slice_of
from .options import SOMATileDBContext
from .options._soma_tiledb_context import _validate_soma_tiledb_context
Expand All @@ -32,7 +32,7 @@
Domain = Sequence[AxisDomain]


class DataFrame(TileDBArray, somacore.DataFrame):
class DataFrame(SOMAArray, somacore.DataFrame):
""":class:`DataFrame` is a multi-column table with a user-defined schema. The
schema is expressed as an
`Arrow Schema <https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html>`_,
Expand Down Expand Up @@ -454,7 +454,7 @@ def write(
platform_config
)
if tiledb_create_options.consolidate_and_vacuum:
self._consolidate_and_vacuum()
clib_dataframe.consolidate_and_vacuum()

Check warning on line 457 in apis/python/src/tiledbsoma/_dataframe.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/_dataframe.py#L457

Added line #L457 was not covered by tests
return self

def _set_reader_coord(
Expand Down
10 changes: 5 additions & 5 deletions apis/python/src/tiledbsoma/_dense_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def write(
"""
_util.check_type("values", values, (pa.Tensor,))

handle = self._handle._handle
clib_dense_array = self._handle._handle

new_coords: List[Union[int, Slice[int], None]] = []
for c in coords:
Expand All @@ -280,16 +280,16 @@ def write(
if input.flags.f_contiguous
else clib.ResultOrder.rowmajor
)
handle.reset(result_order=order)
clib_dense_array.reset(result_order=order)

self._set_reader_coords(handle, new_coords)
handle.write(input)
self._set_reader_coords(clib_dense_array, new_coords)
clib_dense_array.write(input)

tiledb_create_options = TileDBCreateOptions.from_platform_config(
platform_config
)
if tiledb_create_options.consolidate_and_vacuum:
self._consolidate_and_vacuum()
clib_dense_array.consolidate_and_vacuum()

Check warning on line 292 in apis/python/src/tiledbsoma/_dense_nd_array.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/_dense_nd_array.py#L292

Added line #L292 was not covered by tests
return self

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,23 @@
#
# Licensed under the MIT License.

from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
from typing import Any, Optional, Sequence, Tuple

import pyarrow as pa
from somacore import options
from typing_extensions import Self

import tiledb

from . import _tdb_handles, _util

# This package's pybind11 code
from . import pytiledbsoma as clib # noqa: E402
from ._arrow_types import tiledb_schema_to_arrow
from ._tiledb_object import TileDBObject
from ._types import OpenTimestamp, is_nonstringy_sequence
from .options._soma_tiledb_context import SOMATileDBContext


class TileDBArray(TileDBObject[_tdb_handles.ArrayWrapper]):
"""Wraps arrays from TileDB-Py by retaining a URI, options, etc.
Also serves as an abstraction layer to hide TileDB-specific details
from the API, unless requested.
class SOMAArray(TileDBObject[_tdb_handles.SOMAArrayWrapper[Any]]):
"""Base class for all SOMAArrays: DataFrame and NDarray.
Lifecycle:
Experimental.
Expand Down Expand Up @@ -61,14 +56,7 @@ def schema(self) -> pa.Schema:
Lifecycle:
Experimental.
"""
if isinstance(
self._tiledb_array_schema(),
tiledb.ArraySchema,
):
return tiledb_schema_to_arrow(
self._tiledb_array_schema(), self.uri, self._ctx
)
return self._tiledb_array_schema()
return self._handle.schema

def non_empty_domain(self) -> Tuple[Tuple[Any, Any], ...]:
"""
Expand All @@ -81,10 +69,6 @@ def non_empty_domain(self) -> Tuple[Tuple[Any, Any], ...]:
"""
return self._handle.non_empty_domain()

def _tiledb_array_schema(self) -> Union[pa.Schema, tiledb.ArraySchema]:
"""Returns the TileDB array schema, for internal use."""
return self._handle.schema

def _tiledb_array_keys(self) -> Tuple[str, ...]:
"""Return all dim and attr names."""
return self._tiledb_dim_names() + self._tiledb_attr_names()
Expand All @@ -102,46 +86,6 @@ def _tiledb_attr_names(self) -> Tuple[str, ...]:
def _tiledb_domain(self) -> Tuple[Tuple[Any, Any], ...]:
return self._handle.domain

def _soma_reader(
self,
*,
schema: Optional[tiledb.ArraySchema] = None,
column_names: Optional[Sequence[str]] = None,
query_condition: Optional[tiledb.QueryCondition] = None,
result_order: Optional[options.ResultOrderStr] = None,
) -> clib.SOMAArray:
"""Constructs a C++ SOMAArray using appropriate context/config/etc."""
# Leave empty arguments out of kwargs to allow C++ constructor defaults to apply, as
# they're not all wrapped in std::optional<>.
kwargs: Dict[str, object] = {}
# if schema:
# kwargs["schema"] = schema
if column_names:
kwargs["column_names"] = column_names
if result_order:
result_order_map = {
"auto": clib.ResultOrder.automatic,
"row-major": clib.ResultOrder.rowmajor,
"column-major": clib.ResultOrder.colmajor,
}
result_order_enum = result_order_map[
options.ResultOrder(result_order).value
]
kwargs["result_order"] = result_order_enum

soma_array = clib.SOMAArray(
self.uri,
name=f"{self} reader",
platform_config=self._ctx.config().dict(),
timestamp=(0, self.tiledb_timestamp_ms),
**kwargs,
)

if query_condition:
soma_array.set_condition(query_condition, self._tiledb_array_schema())

return soma_array

def _set_reader_coords(self, sr: clib.SOMAArray, coords: Sequence[object]) -> None:
"""Parses the given coords and sets them on the SOMA Reader."""
if not is_nonstringy_sequence(coords):
Expand Down Expand Up @@ -193,66 +137,3 @@ def _set_reader_coord(
# If `None`, coord was `slice(None)` and there is no constraint.
return True
return False

@classmethod
def _create_internal(
cls,
uri: str,
schema: tiledb.ArraySchema,
context: SOMATileDBContext,
tiledb_timestamp: Optional[OpenTimestamp],
) -> Union[
_tdb_handles.ArrayWrapper,
_tdb_handles.DataFrameWrapper,
_tdb_handles.DenseNDArrayWrapper,
_tdb_handles.SparseNDArrayWrapper,
]:
"""Creates the TileDB Array for this type and returns an opened handle.
This does the work of creating a TileDB Array with the provided schema
at the given URI, sets the necessary metadata, and returns a handle to
the newly-created array, open for writing.
"""
tiledb.Array.create(uri, schema, ctx=context.tiledb_ctx)
handle = cls._wrapper_type.open(uri, "w", context, tiledb_timestamp)
cls._set_create_metadata(handle)
return handle

def _consolidate_and_vacuum(
self, modes: List[str] = ["fragment_meta", "commits"]
) -> None:
"""
This post-ingestion helper consolidates and vacuums fragment metadata and commit files --
this is quick to do, and positively impacts query performance. It does _not_ consolidate
bulk array data, which is more time-consuming and should be done at the user's opt-in
discretion.
"""

for mode in modes:
self._consolidate(modes=[mode])
self._vacuum(modes=[mode])

def _consolidate(self, modes: List[str] = ["fragment_meta", "commits"]) -> None:
"""
This post-ingestion helper consolidates by default fragment metadata and commit files --
this is quick to do, and positively impacts query performance.
"""

for mode in modes:
cfg = self._ctx.config()
cfg["sm.consolidation.mode"] = mode
ctx = tiledb.Ctx(cfg)

tiledb.consolidate(self.uri, ctx=ctx)

def _vacuum(self, modes: List[str] = ["fragment_meta", "commits"]) -> None:
"""
This post-ingestion helper vacuums by default fragment metadata and commit files. Vacuuming is not multi-process safe and requires coordination that nothing is currently reading the files that will be vacuumed.
"""

for mode in modes:
cfg = self._ctx.config()
cfg["sm.vacuum.mode"] = mode
ctx = tiledb.Ctx(cfg)

tiledb.vacuum(self.uri, ctx=ctx)
Loading

0 comments on commit 05d4440

Please sign in to comment.