Skip to content

Commit

Permalink
[python] Remove double open For SOMAArray reads (#3293)
Browse files Browse the repository at this point in the history
- Remove `clib.SOMAArray.open` in all `SOMAArray` derived class `read` calls
- Pass query arguments coords, column names, result order, value filter, and platform config that were formerly passed to `clib.SOMAArray.open` or `clib.SOMAArray.reset` instead to `TableReadIter` and `SparseNDArrayRead`
- `_set_coords` on the `ManagedQuery` rather than through `SOMAArray`
- `_arrow_table_reader` and `SparseTensorReadIterBase` initialize a `ManagedQuery` object and set the query arguments listed above; set up the read; and submit reads and return results until the query is complete
  • Loading branch information
nguyenv authored Dec 9, 2024
1 parent 504df5f commit 1f81a86
Show file tree
Hide file tree
Showing 12 changed files with 543 additions and 212 deletions.
30 changes: 7 additions & 23 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from . import pytiledbsoma as clib
from ._constants import SOMA_JOINID
from ._exception import SOMAError, map_exception_for_create
from ._query_condition import QueryCondition
from ._read_iters import TableReadIter
from ._soma_array import SOMAArray
from ._tdb_handles import DataFrameWrapper
Expand Down Expand Up @@ -722,31 +721,16 @@ def read(
_util.check_unpartitioned(partitions)
self._check_open_read()

handle = self._handle._handle

context = handle.context()
if platform_config is not None:
config = context.tiledb_config.copy()
config.update(platform_config)
context = clib.SOMAContext(config)

sr = clib.SOMADataFrame.open(
uri=handle.uri,
mode=clib.OpenMode.read,
context=context,
column_names=column_names or [],
# TODO: batch_size
return TableReadIter(
array=self,
coords=coords,
column_names=column_names,
result_order=_util.to_clib_result_order(result_order),
timestamp=handle.timestamp and (0, handle.timestamp),
value_filter=value_filter,
platform_config=platform_config,
)

if value_filter is not None:
sr.set_condition(QueryCondition(value_filter), handle.schema)

_util._set_coords(sr, coords)

# TODO: batch_size
return TableReadIter(sr)

def write(
self, values: pa.Table, platform_config: Optional[options.PlatformConfig] = None
) -> Self:
Expand Down
50 changes: 16 additions & 34 deletions apis/python/src/tiledbsoma/_dense_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ._arrow_types import pyarrow_to_carrow_type
from ._common_nd_array import NDArray
from ._exception import SOMAError, map_exception_for_create
from ._read_iters import TableReadIter
from ._tdb_handles import DenseNDArrayWrapper
from ._types import OpenTimestamp, Slice
from ._util import dense_indices_to_shape
Expand Down Expand Up @@ -232,42 +233,20 @@ def read(
data_shape = tuple(handle.shape if use_shape else ned)
target_shape = dense_indices_to_shape(coords, data_shape, result_order)

context = handle.context()
if platform_config is not None:
config = context.tiledb_config.copy()
config.update(platform_config)
context = clib.SOMAContext(config)

sr = clib.SOMADenseNDArray.open(
uri=handle.uri,
mode=clib.OpenMode.read,
context=context,
arrow_table = TableReadIter(
array=self,
coords=coords,
column_names=[],
result_order=_util.to_clib_result_order(result_order),
timestamp=handle.timestamp and (0, handle.timestamp),
)
value_filter=None,
platform_config=platform_config,
).concat()

_util._set_coords(sr, coords)

arrow_tables = []
while True:
arrow_table_piece = sr.read_next()
if not arrow_table_piece:
break
arrow_tables.append(arrow_table_piece)

# For dense arrays there is no zero-output case: attempting to make a test case
# to do that, say by indexing a 10x20 array by positions 888 and 999, results
# in read-time errors of the form
#
# [TileDB::Subarray] Error: Cannot add range to dimension 'soma_dim_0'; Range [888, 888] is
# out of domain bounds [0, 9]
if not arrow_tables:
if arrow_table is None:
raise SOMAError(
"internal error: at least one table-piece should have been returned"
)

arrow_table = pa.concat_tables(arrow_tables)
npval = arrow_table.column("soma_data").to_numpy()
# TODO: as currently coded we're looking at the non-empty domain upper
# bound but not its lower bound. That works fine if data are written at
Expand Down Expand Up @@ -310,7 +289,7 @@ def write(
"""
_util.check_type("values", values, (pa.Tensor,))

clib_dense_array = self._handle._handle
clib_handle = self._handle._handle

# Compute the coordinates for the dense array.
new_coords: List[Union[int, Slice[int], None]] = []
Expand All @@ -331,13 +310,16 @@ def write(
if not input.flags.contiguous:
input = np.ascontiguousarray(input)
order = clib.ResultOrder.rowmajor
clib_dense_array.reset(result_order=order)
_util._set_coords(clib_dense_array, new_coords)
clib_dense_array.write(input)

mq = clib.ManagedQuery(clib_handle, clib_handle.context())
mq.set_layout(order)
_util._set_coords(mq, clib_handle, new_coords)
mq.set_soma_data(input)
mq.submit_write()

tiledb_write_options = TileDBWriteOptions.from_platform_config(platform_config)
if tiledb_write_options.consolidate_and_vacuum:
clib_dense_array.consolidate_and_vacuum()
clib_handle.consolidate_and_vacuum()
return self

@classmethod
Expand Down
30 changes: 7 additions & 23 deletions apis/python/src/tiledbsoma/_point_cloud_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
_revise_domain_for_extent,
)
from ._exception import SOMAError, map_exception_for_create
from ._query_condition import QueryCondition
from ._read_iters import TableReadIter
from ._spatial_dataframe import SpatialDataFrame
from ._spatial_util import (
Expand Down Expand Up @@ -332,31 +331,16 @@ def read(
_util.check_unpartitioned(partitions)
self._check_open_read()

handle = self._handle._handle

context = handle.context()
if platform_config is not None:
config = context.tiledb_config.copy()
config.update(platform_config)
context = clib.SOMAContext(config)

sr = clib.SOMAPointCloudDataFrame.open(
uri=handle.uri,
mode=clib.OpenMode.read,
context=context,
column_names=column_names or [],
# TODO: batch_size
return TableReadIter(
array=self,
coords=coords,
column_names=column_names,
result_order=_util.to_clib_result_order(result_order),
timestamp=handle.timestamp and (0, handle.timestamp),
value_filter=value_filter,
platform_config=platform_config,
)

if value_filter is not None:
sr.set_condition(QueryCondition(value_filter), handle.schema)

_util._set_coords(sr, coords)

# # TODO: batch_size
return TableReadIter(sr)

def read_spatial_region(
self,
region: Optional[options.SpatialRegion] = None,
Expand Down
Loading

0 comments on commit 1f81a86

Please sign in to comment.