diff --git a/pyproject.toml b/pyproject.toml index b3f31e2..7e07880 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,6 @@ readme = "./python-spec/README.md" dependencies = [ "anndata", "attrs>=22.1", - "numba", "numpy>=1.21", "pandas", "pyarrow", diff --git a/python-spec/requirements-py3.10.txt b/python-spec/requirements-py3.10.txt index c2fd0fe..951c0cd 100644 --- a/python-spec/requirements-py3.10.txt +++ b/python-spec/requirements-py3.10.txt @@ -5,7 +5,6 @@ exceptiongroup==1.2.1 h5py==3.11.0 llvmlite==0.43.0 natsort==8.4.0 -numba==0.60.0 numpy==2.0.0 packaging==24.1 pandas==2.2.2 diff --git a/python-spec/requirements-py3.11.txt b/python-spec/requirements-py3.11.txt index 665528f..0dd6c8a 100644 --- a/python-spec/requirements-py3.11.txt +++ b/python-spec/requirements-py3.11.txt @@ -4,7 +4,6 @@ attrs==23.2.0 h5py==3.11.0 llvmlite==0.43.0 natsort==8.4.0 -numba==0.60.0 numpy==2.0.0 packaging==24.1 pandas==2.2.2 diff --git a/python-spec/requirements-py3.12.txt b/python-spec/requirements-py3.12.txt index 0fe2f05..1d0585c 100644 --- a/python-spec/requirements-py3.12.txt +++ b/python-spec/requirements-py3.12.txt @@ -4,7 +4,6 @@ attrs==23.2.0 h5py==3.11.0 llvmlite==0.43.0 natsort==8.4.0 -numba==0.60.0 numpy==2.0.0 packaging==24.1 pandas==2.2.2 diff --git a/python-spec/requirements-py3.9.txt b/python-spec/requirements-py3.9.txt index db2c51a..fc763a1 100644 --- a/python-spec/requirements-py3.9.txt +++ b/python-spec/requirements-py3.9.txt @@ -7,7 +7,6 @@ h5py==3.11.0 jmespath==1.0.1 llvmlite==0.43.0 natsort==8.4.0 -numba==0.60.0 numpy==2.0.0 packaging==24.1 pandas==2.2.2 diff --git a/python-spec/src/somacore/ephemeral/__init__.py b/python-spec/src/somacore/ephemeral/__init__.py deleted file mode 100644 index 932f63f..0000000 --- a/python-spec/src/somacore/ephemeral/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -"""In-memory-only implementations of SOMA types. - -These are meant for testing and exploration, where a user may want to do an -ad-hoc analysis of multiple datasets without having to create a stored -Collection. -""" - -from .collections import Collection -from .collections import Experiment -from .collections import Measurement -from .collections import Scene - -__all__ = ( - "Collection", - "Experiment", - "Measurement", - "Scene", -) diff --git a/python-spec/src/somacore/ephemeral/collections.py b/python-spec/src/somacore/ephemeral/collections.py deleted file mode 100644 index 68a224d..0000000 --- a/python-spec/src/somacore/ephemeral/collections.py +++ /dev/null @@ -1,234 +0,0 @@ -from typing import ( - Any, - Dict, - Iterator, - NoReturn, - Optional, - TypeVar, -) - -from typing_extensions import Literal, Self - -from .. import base -from .. import collection -from .. import coordinates -from .. import data -from .. import experiment -from .. import measurement -from .. import options -from .. import scene -from .. import spatial - -_Elem = TypeVar("_Elem", bound=base.SOMAObject) - - -class BaseCollection(collection.BaseCollection[_Elem]): - """A memory-backed SOMA Collection for ad-hoc collection building. - - This Collection implementation exists purely in memory. It can be used to - build ad-hoc SOMA Collections for one-off analyses, and to combine SOMA - datasets from different sources that cannot be added to a Collection that - is represented in storage. - - Entries added to this Collection are not "owned" by the collection; their - lifecycle is still dictated by the place they were opened from. This - collection has no ``context`` and ``close``ing it does nothing. - """ - - __slots__ = ("_entries", "_metadata") - - def __init__(self, *args: Any, **kwargs: _Elem): - """Creates a new Collection. - - Arguments and kwargs are provided as in the ``dict`` constructor. - """ - self._entries: Dict[str, _Elem] = dict(*args, **kwargs) - self._metadata: Dict[str, Any] = {} - - @property - def uri(self) -> str: - return f"somacore:ephemeral-collection:{id(self):x}" - - @property - def metadata(self) -> Dict[str, Any]: - return self._metadata - - @classmethod - def open(cls, *args, **kwargs) -> NoReturn: - del args, kwargs # All unused - raise TypeError( - "Ephemeral collections are in-memory only and cannot be opened." - ) - - @classmethod - def exists(cls, uri: str, *, context: Any = None) -> Literal[False]: - del uri, context # All unused. - # Ephemeral collections are in-memory only and do not otherwise exist. - return False - - @classmethod - def create(cls, *args, **kwargs) -> Self: - del args, kwargs # All unused - # ThisCollection is in-memory only, so just return a new empty one. - return cls() - - def add_new_collection(self, *args, **kwargs) -> NoReturn: - del args, kwargs # All unused - # TODO: Should we be willing to create Collection-based child elements, - # like Measurement and Experiment? - raise TypeError( - "An ephemeral Collection cannot create its own children;" - " only existing SOMA objects may be added." - ) - - add_new_dataframe = add_new_collection - add_new_sparse_ndarray = add_new_collection - add_new_dense_ndarray = add_new_collection - - @property - def closed(self) -> bool: - return False # With no backing storage, there is nothing to close. - - @property - def mode(self) -> options.OpenMode: - return "w" # This collection is always writable. - - def set( - self, key: str, value: _Elem, *, use_relative_uri: Optional[bool] = None - ) -> Self: - del use_relative_uri # Ignored. - self._entries[key] = value - return self - - def __getitem__(self, key: str) -> _Elem: - return self._entries[key] - - def __delitem__(self, key: str) -> None: - del self._entries[key] - - def __iter__(self) -> Iterator[str]: - return iter(self._entries) - - def __len__(self) -> int: - return len(self._entries) - - -class Collection( # type: ignore[misc] # __eq__ false positive - BaseCollection[_Elem], collection.Collection -): - """An in-memory Collection imposing no semantics on the contents.""" - - __slots__ = () - - -_BasicAbstractMeasurement = measurement.Measurement[ - data.DataFrame, - collection.Collection[data.NDArray], - collection.Collection[data.DenseNDArray], - collection.Collection[data.SparseNDArray], - base.SOMAObject, -] -"""The loosest possible constraint of the abstract Measurement type.""" - -_BasicAbstractScene = scene.Scene[ - spatial.MultiscaleImage, - spatial.PointCloudDataFrame, - spatial.GeometryDataFrame, - base.SOMAObject, -] -"""The loosest possible constraint of the abstract Scene type.""" - - -class Measurement( # type: ignore[misc] # __eq__ false positive - BaseCollection[base.SOMAObject], _BasicAbstractMeasurement -): - """An in-memory Collection with Measurement semantics.""" - - __slots__ = () - - -class Scene( # type: ignore[misc] # __eq__ false positive - BaseCollection[base.SOMAObject], _BasicAbstractScene -): - """An in-memory Collection with Scene semantics.""" - - __slots__ = () - - @property - def coordinate_space(self) -> coordinates.CoordinateSpace: - """Coordinate system for this scene.""" - raise NotImplementedError() - - @coordinate_space.setter - def coordinate_space(self, value: coordinates.CoordinateSpace) -> None: - raise NotImplementedError() - - def add_new_geometry_dataframe(self, *args, **kwargs) -> spatial.GeometryDataFrame: - raise NotImplementedError() - - def add_new_multiscale_image(self, *args, **kwargs) -> spatial.MultiscaleImage: - raise NotImplementedError() - - def add_new_point_cloud_dataframe( - self, *args, **kwargs - ) -> spatial.PointCloudDataFrame: - raise NotImplementedError() - - def set_transform_to_geometry_dataframe( - self, *args, **kwargs - ) -> spatial.GeometryDataFrame: - raise NotImplementedError() - - def set_transform_to_multiscale_image( - self, *args, **kwargs - ) -> spatial.MultiscaleImage: - raise NotImplementedError() - - def set_transform_to_point_cloud_dataframe( - self, *args, **kwargs - ) -> spatial.PointCloudDataFrame: - raise NotImplementedError() - - def get_transform_from_geometry_dataframe( - self, *args, **kwargs - ) -> coordinates.CoordinateTransform: - raise NotImplementedError() - - def get_transform_from_multiscale_image( - self, *args, **kwargs - ) -> coordinates.CoordinateTransform: - raise NotImplementedError() - - def get_transform_from_point_cloud_dataframe( - self, *args, **kwargs - ) -> coordinates.CoordinateTransform: - raise NotImplementedError() - - def get_transform_to_geometry_dataframe( - self, *args, **kwargs - ) -> coordinates.CoordinateTransform: - raise NotImplementedError() - - def get_transform_to_multiscale_image( - self, *args, **kwargs - ) -> coordinates.CoordinateTransform: - raise NotImplementedError() - - def get_transform_to_point_cloud_dataframe( - self, *args, **kwargs - ) -> coordinates.CoordinateTransform: - raise NotImplementedError() - - -class Experiment( # type: ignore[misc] # __eq__ false positive - BaseCollection[base.SOMAObject], - experiment.Experiment[ - data.DataFrame, - collection.Collection[_BasicAbstractMeasurement], - collection.Collection[_BasicAbstractScene], - base.SOMAObject, - ], -): - """An in-memory Collection with Experiment semantics.""" - - __slots__ = () diff --git a/python-spec/src/somacore/experiment.py b/python-spec/src/somacore/experiment.py index fbdcb51..8869ffa 100644 --- a/python-spec/src/somacore/experiment.py +++ b/python-spec/src/somacore/experiment.py @@ -1,6 +1,8 @@ +from abc import ABC +from abc import abstractmethod from typing import Generic, Optional, TypeVar -from typing_extensions import Final, Self +from typing_extensions import Final from . import _mixin from . import base @@ -9,6 +11,7 @@ from . import measurement from . import query from . import scene +from .query import ExperimentAxisQuery _DF = TypeVar("_DF", bound=data.DataFrame) """An implementation of a DataFrame.""" @@ -20,8 +23,10 @@ """The root SOMA object type of the implementation.""" -class Experiment( - collection.BaseCollection[_RootSO], Generic[_DF, _MeasColl, _SceneColl, _RootSO] +class Experiment( # type: ignore[misc] # __eq__ false positive + collection.BaseCollection[_RootSO], + Generic[_DF, _MeasColl, _SceneColl, _RootSO], + ABC, ): """A collection subtype representing an annotated 2D matrix of measurements. @@ -33,22 +38,6 @@ class Experiment( Lifecycle: maturing """ - # This class is implemented as a mixin to be used with SOMA classes. - # For example, a SOMA implementation would look like this: - # - # # This type-ignore comment will always be needed due to limitations - # # of type annotations; it is (currently) expected. - # class Experiment( # type: ignore[type-var] - # ImplBaseCollection[ImplSOMAObject], - # somacore.Experiment[ - # ImplDataFrame, # _DF - # ImplMeasurement, # _MeasColl - # ImplScene, # _SceneColl - # ImplSOMAObject, # _RootSO - # ], - # ): - # ... - __slots__ = () soma_type: Final = "SOMAExperiment" # type: ignore[misc] @@ -77,24 +66,18 @@ class Experiment( ``scene_id`` and ``False`` otherwise. """ + @abstractmethod def axis_query( self, measurement_name: str, *, obs_query: Optional[query.AxisQuery] = None, var_query: Optional[query.AxisQuery] = None, - ) -> "query.ExperimentAxisQuery[Self]": + ) -> ExperimentAxisQuery: """Creates an axis query over this experiment. See :class:`query.ExperimentAxisQuery` for details on usage. Lifecycle: maturing """ - # mypy doesn't quite understand descriptors so it issues a spurious - # error here. - return query.ExperimentAxisQuery( # type: ignore[type-var] - self, - measurement_name, - obs_query=obs_query or query.AxisQuery(), - var_query=var_query or query.AxisQuery(), - ) + raise NotImplementedError diff --git a/python-spec/src/somacore/query/__init__.py b/python-spec/src/somacore/query/__init__.py index 41598d9..808b704 100644 --- a/python-spec/src/somacore/query/__init__.py +++ b/python-spec/src/somacore/query/__init__.py @@ -3,10 +3,12 @@ ExperimentAxisQuery = query.ExperimentAxisQuery AxisColumnNames = query.AxisColumnNames +AxisIndexer = query.AxisIndexer AxisQuery = axis.AxisQuery __all__ = ( "ExperimentAxisQuery", "AxisColumnNames", + "AxisIndexer", "AxisQuery", ) diff --git a/python-spec/src/somacore/query/_eager_iter.py b/python-spec/src/somacore/query/_eager_iter.py deleted file mode 100644 index c42e52c..0000000 --- a/python-spec/src/somacore/query/_eager_iter.py +++ /dev/null @@ -1,51 +0,0 @@ -from concurrent import futures -from typing import Iterator, Optional, TypeVar - -_T = TypeVar("_T") - - -class EagerIterator(Iterator[_T]): - def __init__( - self, - iterator: Iterator[_T], - pool: Optional[futures.Executor] = None, - ): - super().__init__() - self.iterator = iterator - self._pool = pool or futures.ThreadPoolExecutor() - self._own_pool = pool is None - self._preload_future = self._pool.submit(self.iterator.__next__) - - def __next__(self) -> _T: - stopped = False - try: - if self._preload_future.cancel(): - # If `.cancel` returns True, cancellation was successful. - # The self.iterator.__next__ call has not yet been started, - # and will never be started, so we can compute next ourselves. - # This prevents deadlocks if the thread pool is too small - # and we can never create a preload thread. - return next(self.iterator) - # `.cancel` returned false, so the preload is already running. - # Just wait for it. - return self._preload_future.result() - except StopIteration: - self._cleanup() - stopped = True - raise - finally: - if not stopped: - # If we have more to do, go for the next thing. - self._preload_future = self._pool.submit(self.iterator.__next__) - - def _cleanup(self) -> None: - if self._own_pool: - self._pool.shutdown() - - def __del__(self) -> None: - # Ensure the threadpool is cleaned up in the case where the - # iterator is not exhausted. For more information on __del__: - # https://docs.python.org/3/reference/datamodel.html#object.__del__ - self._cleanup() - super_del = getattr(super(), "__del__", lambda: None) - super_del() diff --git a/python-spec/src/somacore/query/_fast_csr.py b/python-spec/src/somacore/query/_fast_csr.py deleted file mode 100644 index 99a784d..0000000 --- a/python-spec/src/somacore/query/_fast_csr.py +++ /dev/null @@ -1,275 +0,0 @@ -import os -from concurrent import futures -from typing import List, NamedTuple, Tuple, Type, cast - -import numba -import numba.typed -import numpy as np -import numpy.typing as npt -import pyarrow as pa -from scipy import sparse - -from .. import data as scd -from . import _eager_iter -from . import types - - -def read_csr( - matrix: scd.SparseNDArray, - obs_joinids: pa.IntegerArray, - var_joinids: pa.IntegerArray, - index_factory: types.IndexFactory, -) -> "AccumulatedCSR": - if not isinstance(matrix, scd.SparseNDArray) or matrix.ndim != 2: - raise TypeError("Can only read from a 2D SparseNDArray") - - max_workers = (os.cpu_count() or 4) + 2 - with futures.ThreadPoolExecutor(max_workers=max_workers) as pool: - acc = _CSRAccumulator( - obs_joinids=obs_joinids, - var_joinids=var_joinids, - pool=pool, - index_factory=index_factory, - ) - for tbl in _eager_iter.EagerIterator( - matrix.read((obs_joinids, var_joinids)).tables(), - pool=pool, - ): - acc.append(tbl["soma_dim_0"], tbl["soma_dim_1"], tbl["soma_data"]) - - return acc.finalize() - - -class AccumulatedCSR(NamedTuple): - """ - Private. - - Return type for the _CSRAccumulator.finalize method. - Contains a sparse CSR's constituent elements. - """ - - data: npt.NDArray[np.number] - indptr: npt.NDArray[np.integer] - indices: npt.NDArray[np.integer] - shape: Tuple[int, int] - - def to_scipy(self) -> sparse.csr_matrix: - """Create a Scipy sparse.csr_matrix from component elements. - - Conceptually, this is identical to:: - - sparse.csr_matrix((data, indices, indptr), shape=shape) - - This ugliness is to bypass the O(N) scan that - :meth:`sparse._cs_matrix.__init__` - does when a new compressed matrix is created. - - See `SciPy bug 11496 ` - for details. - """ - matrix = sparse.csr_matrix.__new__(sparse.csr_matrix) - matrix.data = self.data - matrix.indptr = self.indptr - matrix.indices = self.indices - matrix._shape = self.shape - return matrix - - -class _CSRAccumulator: - """ - Fast accumulator of a CSR, based upon COO input. - """ - - def __init__( - self, - obs_joinids: pa.IntegerArray, - var_joinids: pa.IntegerArray, - pool: futures.Executor, - index_factory: types.IndexFactory, - ): - self.obs_joinids = obs_joinids - self.var_joinids = var_joinids - self.pool = pool - - self.shape: Tuple[int, int] = (len(self.obs_joinids), len(self.var_joinids)) - self.obs_indexer = index_factory(self.obs_joinids) - self.var_indexer = index_factory(self.var_joinids) - self.row_length: npt.NDArray[np.int64] = np.zeros( - (self.shape[0],), dtype=_select_dtype(self.shape[1]) - ) - - # COO accumulated chunks, stored as list of triples (row_ind, col_ind, data) - self.coo_chunks: List[ - Tuple[ - npt.NDArray[np.integer], # row_ind - npt.NDArray[np.integer], # col_ind - npt.NDArray[np.number], # data - ] - ] = [] - - def append( - self, - row_joinids: pa.Array, - col_joinids: pa.Array, - data: pa.Array, - ) -> None: - """ - At accumulation time, do several things: - - * re-index to positional indices, and if possible, cast to smaller dtype - to minimize memory footprint (at cost of some amount of time) - * accumulate column counts by row, i.e., build the basis of the indptr - * cache the tuple of data, row, col - """ - rows_future = self.pool.submit( - _reindex_and_cast, - self.obs_indexer, - row_joinids.to_numpy(), - _select_dtype(self.shape[0]), - ) - cols_future = self.pool.submit( - _reindex_and_cast, - self.var_indexer, - col_joinids.to_numpy(), - _select_dtype(self.shape[1]), - ) - row_ind = rows_future.result() - col_ind = cols_future.result() - self.coo_chunks.append((row_ind, col_ind, data.to_numpy())) - _accum_row_length(self.row_length, row_ind) - - def finalize(self) -> AccumulatedCSR: - nnz = sum(len(chunk[2]) for chunk in self.coo_chunks) - index_dtype = _select_dtype(nnz) - if nnz == 0: - # There is no way to infer matrix dtype, so use a default and return - # an empty matrix. Float32 is used as a default type, as it is most - # compatible with AnnData expectations. - empty = sparse.csr_matrix((0, 0), dtype=np.float32) - return AccumulatedCSR( - data=empty.data, - indptr=empty.indptr, - indices=empty.indices, - shape=self.shape, - ) - - # cumsum row lengths to get indptr - indptr = np.empty((self.shape[0] + 1,), dtype=index_dtype) - indptr[0:1] = 0 - np.cumsum(self.row_length, out=indptr[1:]) - - # Parallel copy of data and column indices - indices = np.empty((nnz,), dtype=index_dtype) - data = np.empty((nnz,), dtype=self.coo_chunks[0][2].dtype) - - # Empirically determined value. Needs to be large enough for reasonable - # concurrency, without excessive write cache conflict. Controls the - # number of rows that are processed in a single thread, and therefore - # is the primary tuning parameter related to concurrency. - row_rng_mask_bits = 18 - - n_jobs = (self.shape[0] >> row_rng_mask_bits) + 1 - chunk_list = numba.typed.List(self.coo_chunks) - futures.wait( - [ - self.pool.submit( - _copy_chunklist_range, - chunk_list, - data, - indices, - indptr, - row_rng_mask_bits, - job, - ) - for job in range(n_jobs) - ] - ) - _finalize_indptr(indptr) - return AccumulatedCSR( - data=data, indptr=indptr, indices=indices, shape=self.shape - ) - - -@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined] -def _accum_row_length( - row_length: npt.NDArray[np.int64], row_ind: npt.NDArray[np.int64] -) -> None: - for rind in row_ind: - row_length[rind] += 1 - - -@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined] -def _copy_chunk_range( - row_ind_chunk: npt.NDArray[np.signedinteger], - col_ind_chunk: npt.NDArray[np.signedinteger], - data_chunk: npt.NDArray[np.number], - data: npt.NDArray[np.number], - indices: npt.NDArray[np.signedinteger], - indptr: npt.NDArray[np.signedinteger], - row_rng_mask: int, - row_rng_val: int, -): - for n in range(len(data_chunk)): - row = row_ind_chunk[n] - if (row & row_rng_mask) != row_rng_val: - continue - ptr = indptr[row] - indices[ptr] = col_ind_chunk[n] - data[ptr] = data_chunk[n] - indptr[row] += 1 - - -@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined] -def _copy_chunklist_range( - chunk_list: numba.typed.List, - data: npt.NDArray[np.number], - indices: npt.NDArray[np.signedinteger], - indptr: npt.NDArray[np.signedinteger], - row_rng_mask_bits: int, - job: int, -): - assert row_rng_mask_bits >= 1 and row_rng_mask_bits < 64 - row_rng_mask = (2**64 - 1) >> row_rng_mask_bits << row_rng_mask_bits - row_rng_val = job << row_rng_mask_bits - for row_ind_chunk, col_ind_chunk, data_chunk in chunk_list: - _copy_chunk_range( - row_ind_chunk, - col_ind_chunk, - data_chunk, - data, - indices, - indptr, - row_rng_mask, - row_rng_val, - ) - - -@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined] -def _finalize_indptr(indptr: npt.NDArray[np.signedinteger]): - prev = 0 - for r in range(len(indptr)): - t = indptr[r] - indptr[r] = prev - prev = t - - -def _select_dtype( - maxval: int, -) -> Type[np.signedinteger]: - """ - Ascertain the "best" dtype for a zero-based index. Given our - goal of minimizing memory use, "best" is currently defined as - smallest. - """ - if maxval > np.iinfo(np.int32).max: - return np.int64 - else: - return np.int32 - - -def _reindex_and_cast( - index: types.IndexLike, ids: npt.NDArray[np.int64], target_dtype: npt.DTypeLike -) -> npt.NDArray[np.int64]: - return cast( - npt.NDArray[np.int64], index.get_indexer(ids).astype(target_dtype, copy=False) - ) diff --git a/python-spec/src/somacore/query/query.py b/python-spec/src/somacore/query/query.py index a42e190..51e01ef 100644 --- a/python-spec/src/somacore/query/query.py +++ b/python-spec/src/somacore/query/query.py @@ -1,39 +1,31 @@ -import enum -from concurrent import futures +from abc import ABC +from abc import abstractmethod from typing import ( Any, - Callable, - Dict, - Generic, Mapping, Optional, Sequence, - Tuple, - TypeVar, Union, - cast, - overload, ) -import anndata -import attrs import numpy as np import numpy.typing as npt -import pandas as pd import pyarrow as pa -import pyarrow.compute as pacomp -from scipy import sparse -from typing_extensions import Literal, Protocol, Self, TypedDict +from anndata import AnnData +from typing_extensions import Protocol, Self, TypedDict -from .. import data +from .. import DataFrame +from .. import ReadIter +from .. import SparseRead from .. import measurement -from .. import options from .. import types as base_types -from . import _fast_csr -from . import axis -from . import types +from ..options import BatchSize +from ..options import PlatformConfig +from ..options import ReadPartitions +from ..options import ResultOrder +from ..options import ResultOrderStr -_RO_AUTO = options.ResultOrder.AUTO +_RO_AUTO = ResultOrder.AUTO class AxisColumnNames(TypedDict, total=False): @@ -49,11 +41,7 @@ class AxisColumnNames(TypedDict, total=False): """var columns to use. All columns if ``None`` or not present.""" -_Exp = TypeVar("_Exp", bound="_Experimentish") -"""TypeVar for the concrete type of an experiment-like object.""" - - -class ExperimentAxisQuery(Generic[_Exp]): +class ExperimentAxisQuery(ABC): """Axis-based query against a SOMA Experiment. ExperimentAxisQuery allows easy selection and extraction of data from a @@ -64,254 +52,164 @@ class ExperimentAxisQuery(Generic[_Exp]): var value and/or coordinates. Slicing on :class:`SparseNDArray` ``X`` matrices is supported; :class:`DenseNDArray` is not supported at this time. - IMPORTANT: this class is not thread-safe. - - IMPORTANT: this query class assumes it can store the full result of both - axis dataframe queries in memory, and only provides incremental access to - the underlying X NDArray. API features such as ``n_obs`` and ``n_vars`` - codify this in the API. - - IMPORTANT: you must call ``close()`` on any instance of this class to - release underlying resources. The ExperimentAxisQuery is a context manager, - and it is recommended that you use the following pattern to make this easy - and safe:: - - with ExperimentAxisQuery(...) as query: - ... - - This base query implementation is designed to work against any SOMA - implementation that fulfills the basic APIs. A SOMA implementation may - include a custom query implementation optimized for its own use. - Lifecycle: maturing """ - def __init__( - self, - experiment: _Exp, - measurement_name: str, - *, - obs_query: axis.AxisQuery = axis.AxisQuery(), - var_query: axis.AxisQuery = axis.AxisQuery(), - index_factory: types.IndexFactory = pd.Index, - ): - if measurement_name not in experiment.ms: - raise ValueError("Measurement does not exist in the experiment") - - # Users often like to pass `foo=None` and we should let them - obs_query = obs_query or axis.AxisQuery() - var_query = var_query or axis.AxisQuery() - - self.experiment = experiment - self.measurement_name = measurement_name - - self._matrix_axis_query = _MatrixAxisQuery(obs=obs_query, var=var_query) - self._joinids = _JoinIDCache(self) - self._indexer = AxisIndexer( - self, - index_factory=index_factory, - ) - self._index_factory = index_factory - self._threadpool_: Optional[futures.ThreadPoolExecutor] = None - + @abstractmethod def obs( self, *, column_names: Optional[Sequence[str]] = None, - batch_size: options.BatchSize = options.BatchSize(), - partitions: Optional[options.ReadPartitions] = None, - result_order: options.ResultOrderStr = _RO_AUTO, - platform_config: Optional[options.PlatformConfig] = None, - ) -> data.ReadIter[pa.Table]: + batch_size: BatchSize = BatchSize(), + partitions: Optional[ReadPartitions] = None, + result_order: ResultOrderStr = _RO_AUTO, + platform_config: Optional[PlatformConfig] = None, + ) -> ReadIter[pa.Table]: """Returns ``obs`` as an `Arrow table `_ iterator. Lifecycle: maturing """ - obs_query = self._matrix_axis_query.obs - return self._obs_df.read( - obs_query.coords, - value_filter=obs_query.value_filter, - column_names=column_names, - batch_size=batch_size, - partitions=partitions, - result_order=result_order, - platform_config=platform_config, - ) + ... + @abstractmethod def var( self, *, column_names: Optional[Sequence[str]] = None, - batch_size: options.BatchSize = options.BatchSize(), - partitions: Optional[options.ReadPartitions] = None, - result_order: options.ResultOrderStr = _RO_AUTO, - platform_config: Optional[options.PlatformConfig] = None, - ) -> data.ReadIter[pa.Table]: + batch_size: BatchSize = BatchSize(), + partitions: Optional[ReadPartitions] = None, + result_order: ResultOrderStr = _RO_AUTO, + platform_config: Optional[PlatformConfig] = None, + ) -> ReadIter[pa.Table]: """Returns ``var`` as an `Arrow table `_ iterator. Lifecycle: maturing """ - var_query = self._matrix_axis_query.var - return self._var_df.read( - var_query.coords, - value_filter=var_query.value_filter, - column_names=column_names, - batch_size=batch_size, - partitions=partitions, - result_order=result_order, - platform_config=platform_config, - ) + ... + @abstractmethod def obs_joinids(self) -> pa.IntegerArray: """Returns ``obs`` ``soma_joinids`` as an Arrow array. Lifecycle: maturing """ - return self._joinids.obs + ... + @abstractmethod def var_joinids(self) -> pa.IntegerArray: """Returns ``var`` ``soma_joinids`` as an Arrow array. Lifecycle: maturing """ - return self._joinids.var + ... @property + @abstractmethod def n_obs(self) -> int: """The number of ``obs`` axis query results. Lifecycle: maturing """ - return len(self.obs_joinids()) + ... @property + @abstractmethod def n_vars(self) -> int: """The number of ``var`` axis query results. Lifecycle: maturing """ - return len(self.var_joinids()) + ... @property + @abstractmethod def indexer(self) -> "AxisIndexer": """A ``soma_joinid`` indexer for both ``obs`` and ``var`` axes. Lifecycle: maturing """ - return self._indexer + ... + @abstractmethod def X( self, layer_name: str, *, - batch_size: options.BatchSize = options.BatchSize(), - partitions: Optional[options.ReadPartitions] = None, - result_order: options.ResultOrderStr = _RO_AUTO, - platform_config: Optional[options.PlatformConfig] = None, - ) -> data.SparseRead: + batch_size: BatchSize = BatchSize(), + partitions: Optional[ReadPartitions] = None, + result_order: ResultOrderStr = _RO_AUTO, + platform_config: Optional[PlatformConfig] = None, + ) -> SparseRead: """Returns an ``X`` layer as a sparse read. Args: layer_name: The X layer name to return. batch_size: The size of batches that should be returned from a read. - See :class:`options.BatchSize` for details. + See :class:`BatchSize` for details. partitions: Specifies that this is part of a partitioned read, and which partition to include, if present. result_order: the order to return results, specified as a - :class:`~options.ResultOrder` or its string value. + :class:`~ResultOrder` or its string value. platform_config: platform-specific configuration; keys are SOMA implementation names. Lifecycle: maturing """ - try: - x_layer = self._ms.X[layer_name] - except KeyError as ke: - raise KeyError(f"{layer_name} is not present in X") from ke - if not isinstance(x_layer, data.SparseNDArray): - raise TypeError("X layers may only be sparse arrays") - - self._joinids.preload(self._threadpool) - return x_layer.read( - (self._joinids.obs, self._joinids.var), - batch_size=batch_size, - partitions=partitions, - result_order=result_order, - platform_config=platform_config, - ) - - def obsp(self, layer: str) -> data.SparseRead: + ... + + @abstractmethod + def obsp(self, layer: str) -> SparseRead: """Returns an ``obsp`` layer as a sparse read. Lifecycle: maturing """ - return self._axisp_inner(_Axis.OBS, layer) + ... - def varp(self, layer: str) -> data.SparseRead: + @abstractmethod + def varp(self, layer: str) -> SparseRead: """Returns a ``varp`` layer as a sparse read. Lifecycle: maturing """ - return self._axisp_inner(_Axis.VAR, layer) + ... - def obsm(self, layer: str) -> data.SparseRead: + @abstractmethod + def obsm(self, layer: str) -> SparseRead: """Returns an ``obsm`` layer as a sparse read. Lifecycle: maturing """ - return self._axism_inner(_Axis.OBS, layer) + ... - def varm(self, layer: str) -> data.SparseRead: + @abstractmethod + def varm(self, layer: str) -> SparseRead: """Returns a ``varm`` layer as a sparse read. Lifecycle: maturing """ - return self._axism_inner(_Axis.VAR, layer) + ... + @abstractmethod def obs_scene_ids(self) -> pa.Array: """Returns a pyarrow array with scene ids that contain obs from this query. Lifecycle: experimental """ - try: - obs_scene = self.experiment.obs_spatial_presence - except KeyError as ke: - raise KeyError("Missing obs_scene") from ke - if not isinstance(obs_scene, data.DataFrame): - raise TypeError("obs_scene must be a dataframe.") - - full_table = obs_scene.read( - coords=((_Axis.OBS.getattr_from(self._joinids), slice(None))), - result_order=options.ResultOrder.COLUMN_MAJOR, - value_filter="data != 0", - ).concat() - - return pacomp.unique(full_table["scene_id"]) + ... + @abstractmethod def var_scene_ids(self) -> pa.Array: """Return a pyarrow array with scene ids that contain var from this query. Lifecycle: experimental """ - try: - var_scene = self._ms.var_spatial_presence - except KeyError as ke: - raise KeyError("Missing var_scene") from ke - if not isinstance(var_scene, data.DataFrame): - raise TypeError("var_scene must be a dataframe.") - - full_table = var_scene.read( - coords=((_Axis.OBS.getattr_from(self._joinids), slice(None))), - result_order=options.ResultOrder.COLUMN_MAJOR, - value_filter="data != 0", - ).concat() - - return pacomp.unique(full_table["scene_id"]) + ... + @abstractmethod def to_anndata( self, X_name: str, @@ -323,7 +221,7 @@ def to_anndata( varm_layers: Sequence[str] = (), varp_layers: Sequence[str] = (), drop_levels: bool = False, - ) -> anndata.AnnData: + ) -> AnnData: """ Executes the query and return result as an ``AnnData`` in-memory object. @@ -349,29 +247,11 @@ def to_anndata( Lifecycle: maturing """ - ad = self._read( - X_name, - column_names=column_names or AxisColumnNames(obs=None, var=None), - X_layers=X_layers, - obsm_layers=obsm_layers, - obsp_layers=obsp_layers, - varm_layers=varm_layers, - varp_layers=varp_layers, - ).to_anndata() - - # Drop unused categories on axis dataframes if requested - if drop_levels: - for name in ad.obs: - if ad.obs[name].dtype.name == "category": - ad.obs[name] = ad.obs[name].cat.remove_unused_categories() - for name in ad.var: - if ad.var[name].dtype.name == "category": - ad.var[name] = ad.var[name].cat.remove_unused_categories() - - return ad + ... # Context management + @abstractmethod def close(self) -> None: """Releases resources associated with this query. @@ -379,509 +259,48 @@ def close(self) -> None: Lifecycle: maturing """ - # Because this may be called during ``__del__`` when we might be getting - # disassembled, sometimes ``_threadpool_`` is simply missing. - # Only try to shut it down if it still exists. - pool = getattr(self, "_threadpool_", None) - if pool is None: - return - pool.shutdown() - self._threadpool_ = None - - def __enter__(self) -> Self: - return self - - def __exit__(self, *_: Any) -> None: - self.close() - - def __del__(self) -> None: - """Ensure that we're closed when our last ref disappears.""" - self.close() - # If any superclass in our MRO has a __del__, call it. - sdel = getattr(super(), "__del__", lambda: None) - sdel() - - # Internals - - def _read( - self, - X_name: str, - *, - column_names: AxisColumnNames, - X_layers: Sequence[str], - obsm_layers: Sequence[str] = (), - obsp_layers: Sequence[str] = (), - varm_layers: Sequence[str] = (), - varp_layers: Sequence[str] = (), - ) -> "_AxisQueryResult": - """Reads the entire query result in memory. + ... - This is a low-level routine intended to be used by loaders for other - in-core formats, such as AnnData, which can be created from the - resulting objects. + @abstractmethod + def __enter__(self) -> Self: ... - Args: - X_name: The X layer to read and return in the ``X`` slot. - column_names: The columns in the ``var`` and ``obs`` dataframes - to read. - X_layers: Additional X layers to read and return - in the ``layers`` slot. - obsm_layers: - Additional obsm layers to read and return in the obsm slot. - obsp_layers: - Additional obsp layers to read and return in the obsp slot. - varm_layers: - Additional varm layers to read and return in the varm slot. - varp_layers: - Additional varp layers to read and return in the varp slot. - """ - x_collection = self._ms.X - all_x_names = [X_name] + list(X_layers) - all_x_arrays: Dict[str, data.SparseNDArray] = {} - for _xname in all_x_names: - if not isinstance(_xname, str) or not _xname: - raise ValueError("X layer names must be specified as a string.") - if _xname not in x_collection: - raise ValueError("Unknown X layer name") - x_array = x_collection[_xname] - if not isinstance(x_array, data.SparseNDArray): - raise NotImplementedError("Dense array unsupported") - all_x_arrays[_xname] = x_array - - def _read_axis_mappings(fn, axis, keys: Sequence[str]) -> Dict[str, np.ndarray]: - return {key: fn(axis, key) for key in keys} - - obsm_ft = self._threadpool.submit( - _read_axis_mappings, self._axism_inner_ndarray, _Axis.OBS, obsm_layers - ) - obsp_ft = self._threadpool.submit( - _read_axis_mappings, self._axisp_inner_ndarray, _Axis.OBS, obsp_layers - ) - varm_ft = self._threadpool.submit( - _read_axis_mappings, self._axism_inner_ndarray, _Axis.VAR, varm_layers - ) - varp_ft = self._threadpool.submit( - _read_axis_mappings, self._axisp_inner_ndarray, _Axis.VAR, varp_layers - ) - - obs_table, var_table = self._read_both_axes(column_names) - - x_matrices = { - _xname: _fast_csr.read_csr( - all_x_arrays[_xname], - self.obs_joinids(), - self.var_joinids(), - index_factory=self._index_factory, - ).to_scipy() - for _xname in all_x_arrays - } - - x = x_matrices.pop(X_name) - - obs = obs_table.to_pandas() - obs.index = obs.index.astype(str) - - var = var_table.to_pandas() - var.index = var.index.astype(str) - - return _AxisQueryResult( - obs=obs, - var=var, - X=x, - obsm=obsm_ft.result(), - obsp=obsp_ft.result(), - varm=varm_ft.result(), - varp=varp_ft.result(), - X_layers=x_matrices, - ) - - def _read_both_axes( - self, - column_names: AxisColumnNames, - ) -> Tuple[pa.Table, pa.Table]: - """Reads both axes in their entirety, ensuring soma_joinid is retained.""" - obs_ft = self._threadpool.submit( - self._read_axis_dataframe, - _Axis.OBS, - column_names, - ) - var_ft = self._threadpool.submit( - self._read_axis_dataframe, - _Axis.VAR, - column_names, - ) - return obs_ft.result(), var_ft.result() - - def _read_axis_dataframe( - self, - axis: "_Axis", - axis_column_names: AxisColumnNames, - ) -> pa.Table: - """Reads the specified axis. Will cache join IDs if not present.""" - column_names = axis_column_names.get(axis.value) - - axis_df = axis.getattr_from(self, pre="_", suf="_df") - assert isinstance(axis_df, data.DataFrame) - axis_query = axis.getattr_from(self._matrix_axis_query) - - # If we can cache join IDs, prepare to add them to the cache. - joinids_cached = self._joinids._is_cached(axis) - query_columns = column_names - added_soma_joinid_to_columns = False - if ( - not joinids_cached - and column_names is not None - and "soma_joinid" not in column_names - ): - # If we want to fill the join ID cache, ensure that we query the - # soma_joinid column so that it is included in the results. - # We'll filter it out later. - query_columns = ["soma_joinid"] + list(column_names) - added_soma_joinid_to_columns = True - - # Do the actual query. - arrow_table = axis_df.read( - coords=axis_query.coords, - value_filter=axis_query.value_filter, - column_names=query_columns, - ).concat() - - # Update the cache if needed. We can do this because no matter what - # other columns are queried for, the contents of the ``soma_joinid`` - # column will be the same and can be safely stored. - if not joinids_cached: - setattr( - self._joinids, - axis.value, - arrow_table.column("soma_joinid").combine_chunks(), - ) - - # Drop soma_joinid column if we added it solely for use in filling - # the joinid cache. - if added_soma_joinid_to_columns: - arrow_table = arrow_table.drop(["soma_joinid"]) - return arrow_table - - def _axisp_inner( - self, - axis: "_Axis", - layer: str, - ) -> data.SparseRead: - p_name = f"{axis.value}p" - try: - axisp = axis.getitem_from(self._ms, suf="p") - except KeyError as ke: - raise ValueError(f"Measurement does not contain {p_name} data") from ke - - try: - ap_layer = axisp[layer] - except KeyError as ke: - raise ValueError(f"layer {layer!r} is not available in {p_name}") from ke - if not isinstance(ap_layer, data.SparseNDArray): - raise TypeError( - f"Unexpected SOMA type {type(ap_layer).__name__}" - f" stored in {p_name} layer {layer!r}" - ) - - joinids = axis.getattr_from(self._joinids) - return ap_layer.read((joinids, joinids)) - - def _axism_inner( - self, - axis: "_Axis", - layer: str, - ) -> data.SparseRead: - m_name = f"{axis.value}m" - - try: - axism = axis.getitem_from(self._ms, suf="m") - except KeyError: - raise ValueError(f"Measurement does not contain {m_name} data") from None - - try: - axism_layer = axism[layer] - except KeyError as ke: - raise ValueError(f"layer {layer!r} is not available in {m_name}") from ke - - if not isinstance(axism_layer, data.SparseNDArray): - raise TypeError(f"Unexpected SOMA type stored in '{m_name}' layer") - - joinids = axis.getattr_from(self._joinids) - return axism_layer.read((joinids, slice(None))) - - def _convert_to_ndarray( - self, axis: "_Axis", table: pa.Table, n_row: int, n_col: int - ) -> np.ndarray: - indexer = cast( - Callable[[_Numpyable], npt.NDArray[np.intp]], - axis.getattr_from(self.indexer, pre="by_"), - ) - idx = indexer(table["soma_dim_0"]) - z: np.ndarray = np.zeros(n_row * n_col, dtype=np.float32) - np.put(z, idx * n_col + table["soma_dim_1"], table["soma_data"]) - return z.reshape(n_row, n_col) - - def _axisp_inner_ndarray( - self, - axis: "_Axis", - layer: str, - ) -> np.ndarray: - n_row = n_col = len(axis.getattr_from(self._joinids)) - - table = self._axisp_inner(axis, layer).tables().concat() - return self._convert_to_ndarray(axis, table, n_row, n_col) + @abstractmethod + def __exit__(self, *_: Any) -> None: ... - def _axism_inner_ndarray( - self, - axis: "_Axis", - layer: str, - ) -> np.ndarray: - table = self._axism_inner(axis, layer).tables().concat() - - n_row = len(axis.getattr_from(self._joinids)) - n_col = len(table["soma_dim_1"].unique()) - - return self._convert_to_ndarray(axis, table, n_row, n_col) - @property - def _obs_df(self) -> data.DataFrame: - return self.experiment.obs - - @property - def _ms(self) -> measurement.Measurement: - return self.experiment.ms[self.measurement_name] - - @property - def _var_df(self) -> data.DataFrame: - return self._ms.var - - @property - def _threadpool(self) -> futures.ThreadPoolExecutor: - """ - Returns the threadpool provided by the experiment's context. - If not available, creates a thread pool just in time.""" - context = self.experiment.context - if context and context.threadpool: - return context.threadpool - - if self._threadpool_ is None: - self._threadpool_ = futures.ThreadPoolExecutor() - return self._threadpool_ - - -# Private internal data structures - - -@attrs.define(frozen=True) -class _AxisQueryResult: - """The result of running :meth:`ExperimentAxisQuery.read`. Private.""" - - obs: pd.DataFrame - """Experiment.obs query slice, as a pandas DataFrame""" - var: pd.DataFrame - """Experiment.ms[...].var query slice, as a pandas DataFrame""" - X: sparse.csr_matrix - """Experiment.ms[...].X[...] query slice, as a SciPy sparse.csr_matrix """ - X_layers: Dict[str, sparse.csr_matrix] = attrs.field(factory=dict) - """Any additional X layers requested, as SciPy sparse.csr_matrix(s)""" - obsm: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.obsm query slice, as a numpy ndarray""" - obsp: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.obsp query slice, as a numpy ndarray""" - varm: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.varm query slice, as a numpy ndarray""" - varp: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.varp query slice, as a numpy ndarray""" - - def to_anndata(self) -> anndata.AnnData: - return anndata.AnnData( - X=self.X, - obs=self.obs, - var=self.var, - obsm=(self.obsm or None), - obsp=(self.obsp or None), - varm=(self.varm or None), - varp=(self.varp or None), - layers=(self.X_layers or None), - ) - - -class _Axis(enum.Enum): - OBS = "obs" - VAR = "var" - - @property - def value(self) -> Literal["obs", "var"]: - return super().value - - @overload - def getattr_from(self, __source: "_HasObsVar[_T]") -> "_T": ... - - @overload - def getattr_from( - self, __source: Any, *, pre: Literal[""], suf: Literal[""] - ) -> object: ... - - @overload - def getattr_from( - self, __source: Any, *, pre: str = ..., suf: str = ... - ) -> object: ... - - def getattr_from(self, __source: Any, *, pre: str = "", suf: str = "") -> object: - """Equivalent to ``something.
``."""
-        return getattr(__source, pre + self.value + suf)
-
-    def getitem_from(
-        self, __source: Mapping[str, "_T"], *, pre: str = "", suf: str = ""
-    ) -> "_T":
-        """Equivalent to ``something[pre + "obs"/"var" + suf]``."""
-        return __source[pre + self.value + suf]
-
-
-@attrs.define(frozen=True)
-class _MatrixAxisQuery:
-    """The per-axis user query definition. Private."""
-
-    obs: axis.AxisQuery
-    var: axis.AxisQuery
-
-
-@attrs.define
-class _JoinIDCache:
-    """A cache for per-axis join ids in the query. Private."""
-
-    owner: ExperimentAxisQuery
-
-    _cached_obs: Optional[pa.IntegerArray] = None
-    _cached_var: Optional[pa.IntegerArray] = None
-
-    def _is_cached(self, axis: _Axis) -> bool:
-        field = "_cached_" + axis.value
-        return getattr(self, field) is not None
-
-    def preload(self, pool: futures.ThreadPoolExecutor) -> None:
-        if self._cached_obs is not None and self._cached_var is not None:
-            return
-        obs_ft = pool.submit(lambda: self.obs)
-        var_ft = pool.submit(lambda: self.var)
-        # Wait for them and raise in case of error.
-        obs_ft.result()
-        var_ft.result()
-
-    @property
-    def obs(self) -> pa.IntegerArray:
-        """Join IDs for the obs axis. Will load and cache if not already."""
-        if not self._cached_obs:
-            self._cached_obs = _load_joinids(
-                self.owner._obs_df, self.owner._matrix_axis_query.obs
-            )
-        return self._cached_obs
-
-    @obs.setter
-    def obs(self, val: pa.IntegerArray) -> None:
-        self._cached_obs = val
-
-    @property
-    def var(self) -> pa.IntegerArray:
-        """Join IDs for the var axis. Will load and cache if not already."""
-        if not self._cached_var:
-            self._cached_var = _load_joinids(
-                self.owner._var_df, self.owner._matrix_axis_query.var
-            )
-        return self._cached_var
-
-    @var.setter
-    def var(self, val: pa.IntegerArray) -> None:
-        self._cached_var = val
-
-
-def _load_joinids(df: data.DataFrame, axq: axis.AxisQuery) -> pa.IntegerArray:
-    tbl = df.read(
-        axq.coords,
-        value_filter=axq.value_filter,
-        column_names=["soma_joinid"],
-    ).concat()
-    return tbl.column("soma_joinid").combine_chunks()
-
-
-_Numpyable = Union[pa.Array, pa.ChunkedArray, npt.NDArray[np.int64]]
+Numpyable = Union[pa.Array, pa.ChunkedArray, npt.NDArray[np.int64]]
 """Things that can be converted to a NumPy array."""
 
 
-@attrs.define
-class AxisIndexer:
+class AxisIndexer(ABC):
     """
     Given a query, provides index-building services for obs/var axis.
 
     Lifecycle: maturing
     """
 
-    query: ExperimentAxisQuery
-    _index_factory: types.IndexFactory
-    _cached_obs: Optional[types.IndexLike] = None
-    _cached_var: Optional[types.IndexLike] = None
-
-    @property
-    def _obs_index(self) -> types.IndexLike:
-        """Private. Return an index for the ``obs`` axis."""
-        if self._cached_obs is None:
-            self._cached_obs = self._index_factory(self.query.obs_joinids().to_numpy())
-        return self._cached_obs
-
-    @property
-    def _var_index(self) -> types.IndexLike:
-        """Private. Return an index for the ``var`` axis."""
-        if self._cached_var is None:
-            self._cached_var = self._index_factory(self.query.var_joinids().to_numpy())
-        return self._cached_var
-
-    def by_obs(self, coords: _Numpyable) -> npt.NDArray[np.intp]:
+    @abstractmethod
+    def by_obs(self, coords: Numpyable) -> npt.NDArray[np.intp]:
         """Reindex the coords (soma_joinids) over the ``obs`` axis."""
-        return self._obs_index.get_indexer(_to_numpy(coords))
+        ...
 
-    def by_var(self, coords: _Numpyable) -> npt.NDArray[np.intp]:
+    @abstractmethod
+    def by_var(self, coords: Numpyable) -> npt.NDArray[np.intp]:
         """Reindex for the coords (soma_joinids) over the ``var`` axis."""
-        return self._var_index.get_indexer(_to_numpy(coords))
-
+        ...
 
-def _to_numpy(it: _Numpyable) -> np.ndarray:
-    if isinstance(it, np.ndarray):
-        return it
-    return it.to_numpy()
 
-
-#
-# Type shenanigans
-#
-
-_T = TypeVar("_T")
-_T_co = TypeVar("_T_co", covariant=True)
-
-
-class _Experimentish(Protocol):
+class Experimentish(Protocol):
     """The API we need from an Experiment."""
 
     @property
     def ms(self) -> Mapping[str, measurement.Measurement]: ...
 
     @property
-    def obs(self) -> data.DataFrame: ...
+    def obs(self) -> DataFrame: ...
 
     @property
     def context(self) -> Optional[base_types.ContextBase]: ...
 
     @property
-    def obs_spatial_presence(self) -> data.DataFrame: ...
-
-
-class _HasObsVar(Protocol[_T_co]):
-    """Something which has an ``obs`` and ``var`` field.
-
-    Used to give nicer type inference in :meth:`_Axis.getattr_from`.
-    """
-
-    @property
-    def obs(self) -> _T_co: ...
-
-    @property
-    def var(self) -> _T_co: ...
+    def obs_spatial_presence(self) -> DataFrame: ...
diff --git a/python-spec/src/somacore/query/types.py b/python-spec/src/somacore/query/types.py
index f5caa43..e8de994 100644
--- a/python-spec/src/somacore/query/types.py
+++ b/python-spec/src/somacore/query/types.py
@@ -1,13 +1,13 @@
 """Common types used across SOMA query modules."""
 
-from typing import Any, Callable, Union
+from typing import Callable, Union
 
 import numpy as np
 import numpy.typing as npt
 import pyarrow as pa
 from typing_extensions import Protocol
 
-_IntegerArray = Union[npt.NDArray[np.int64], pa.IntegerArray]
+IntegerArray = Union[npt.NDArray[np.int64], pa.IntegerArray]
 
 
 class IndexLike(Protocol):
@@ -19,12 +19,12 @@ class IndexLike(Protocol):
     not as a full specification of the types and behavior of ``get_indexer``.
     """
 
-    def get_indexer(self, target: _IntegerArray) -> Any:
+    def get_indexer(self, target: IntegerArray) -> npt.NDArray[np.intp]:
         """Something compatible with Pandas' Index.get_indexer method."""
 
 
-IndexFactory = Callable[[_IntegerArray], "IndexLike"]
-"""Function that builds an index over the given NDArray.
+IndexFactory = Callable[[IntegerArray], IndexLike]
+"""Function that builds an index over the given ``IntegerArray``.
 
 This interface is implemented by the callable ``pandas.Index``.
 """
diff --git a/python-spec/testing/test_collection.py b/python-spec/testing/test_collection.py
deleted file mode 100644
index e997c6d..0000000
--- a/python-spec/testing/test_collection.py
+++ /dev/null
@@ -1,44 +0,0 @@
-import unittest
-from typing import Any
-
-from somacore import ephemeral
-
-
-class EphemeralCollectionTest(unittest.TestCase):
-    def test_basic(self):
-        # Since the ephemeral Collection implementation is straightforward,
-        # this is just to ensure that we actually fulfill everything.
-
-        coll = ephemeral.Collection[Any]()
-        entry_a = ephemeral.Collection[Any]()
-        coll["a"] = entry_a
-        self.assertIs(entry_a, coll["a"])
-        del coll["a"]
-
-        md = coll.metadata
-
-        md["hello"] = "world"
-
-        self.assertEqual("world", coll.metadata["hello"])
-
-    def test_equality_identity(self):
-        # Ensures that only object identity is used to compare SOMA objects,
-        # and nothing else.
-        # If these were any other Mapping type, they would be `__eq__` here,
-        # since they both have the same (i.e., no) elements.
-        coll = ephemeral.Collection[Any]()
-        coll_2 = ephemeral.Collection[Any]()
-        self.assertNotEqual(coll, coll_2)
-        both = frozenset((coll, coll_2))
-        self.assertIn(coll, both)
-        self.assertIn(coll_2, both)
-
-    def test_method_resolution_order(self):
-        # Ensures that constant definitions interact correctly with the MRO.
-
-        m = ephemeral.Measurement()
-        self.assertEqual("SOMAMeasurement", m.soma_type)
-        exp = ephemeral.Experiment()
-        self.assertEqual("SOMAExperiment", exp.soma_type)
-        scene = ephemeral.Scene()
-        self.assertEqual("SOMAScene", scene.soma_type)
diff --git a/python-spec/testing/test_eager_iter.py b/python-spec/testing/test_eager_iter.py
deleted file mode 100644
index 87f74b9..0000000
--- a/python-spec/testing/test_eager_iter.py
+++ /dev/null
@@ -1,64 +0,0 @@
-import threading
-import unittest
-from concurrent import futures
-from unittest import mock
-
-from somacore.query import _eager_iter
-
-
-class EagerIterTest(unittest.TestCase):
-    def setUp(self):
-        super().setUp()
-        self.kiddie_pool = futures.ThreadPoolExecutor(1)
-        """Tiny thread pool for testing."""
-        self.verify_pool = futures.ThreadPoolExecutor(1)
-        """Separate thread pool so verification is not blocked."""
-
-    def tearDown(self):
-        self.verify_pool.shutdown(wait=False)
-        self.kiddie_pool.shutdown(wait=False)
-        super().tearDown()
-
-    def test_thread_starvation(self):
-        sem = threading.Semaphore()
-        try:
-            # Monopolize the threadpool.
-            sem.acquire()
-            self.kiddie_pool.submit(sem.acquire)
-            eager = _eager_iter.EagerIterator(iter("abc"), pool=self.kiddie_pool)
-            got_a = self.verify_pool.submit(lambda: next(eager))
-            self.assertEqual("a", got_a.result(0.1))
-            got_b = self.verify_pool.submit(lambda: next(eager))
-            self.assertEqual("b", got_b.result(0.1))
-            got_c = self.verify_pool.submit(lambda: next(eager))
-            self.assertEqual("c", got_c.result(0.1))
-            with self.assertRaises(StopIteration):
-                self.verify_pool.submit(lambda: next(eager)).result(0.1)
-        finally:
-            sem.release()
-
-    def test_nesting(self):
-        inner = _eager_iter.EagerIterator(iter("abc"), pool=self.kiddie_pool)
-        outer = _eager_iter.EagerIterator(inner, pool=self.kiddie_pool)
-        self.assertEqual(
-            "a, b, c", self.verify_pool.submit(", ".join, outer).result(0.1)
-        )
-
-    def test_exceptions(self):
-        flaky = mock.MagicMock()
-        flaky.__next__.side_effect = [1, 2, ValueError(), 3, 4]
-
-        eager_flaky = _eager_iter.EagerIterator(flaky, pool=self.kiddie_pool)
-        got_1 = self.verify_pool.submit(lambda: next(eager_flaky))
-        self.assertEqual(1, got_1.result(0.1))
-        got_2 = self.verify_pool.submit(lambda: next(eager_flaky))
-        self.assertEqual(2, got_2.result(0.1))
-        with self.assertRaises(ValueError):
-            self.verify_pool.submit(lambda: next(eager_flaky)).result(0.1)
-        got_3 = self.verify_pool.submit(lambda: next(eager_flaky))
-        self.assertEqual(3, got_3.result(0.1))
-        got_4 = self.verify_pool.submit(lambda: next(eager_flaky))
-        self.assertEqual(4, got_4.result(0.1))
-        for _ in range(5):
-            with self.assertRaises(StopIteration):
-                self.verify_pool.submit(lambda: next(eager_flaky)).result(0.1)
diff --git a/python-spec/testing/test_mixin.py b/python-spec/testing/test_mixin.py
deleted file mode 100644
index 6da3b2a..0000000
--- a/python-spec/testing/test_mixin.py
+++ /dev/null
@@ -1,39 +0,0 @@
-import unittest
-
-from somacore import _mixin
-from somacore import ephemeral
-
-
-class TestItem(unittest.TestCase):
-    def test_get(self):
-        the_a = _mixin.item(str)
-
-        class ItemHaver(ephemeral.Collection):
-            a = the_a
-            b = _mixin.item(int, "base_b")
-
-        self.assertIs(the_a, ItemHaver.a)
-
-        items = ItemHaver()
-        items["c"] = "d"
-
-        with self.assertRaises(AttributeError):
-            items.a
-        with self.assertRaises(AttributeError):
-            items.b
-        with self.assertRaises(AttributeError):
-            del items.a
-
-        self.assertEqual("d", items["c"])
-
-        items["a"] = "a"
-        items["base_b"] = 1
-        self.assertEqual("a", items.a)
-        self.assertEqual(1, items.b)
-
-        items.a = "hello"
-        items.b = 500
-
-        self.assertEqual(dict(a="hello", base_b=500, c="d"), dict(items))
-        del items.a
-        self.assertEqual(dict(base_b=500, c="d"), dict(items))
diff --git a/python-spec/testing/test_query_axis.py b/python-spec/testing/test_query_axis.py
index e9d418f..a235698 100644
--- a/python-spec/testing/test_query_axis.py
+++ b/python-spec/testing/test_query_axis.py
@@ -1,13 +1,11 @@
 from typing import Any, Tuple
 
-import attrs
 import numpy as np
 import pytest
 from pytest import mark
 
 import somacore
 from somacore import options
-from somacore.query import query
 
 
 @mark.parametrize(
@@ -51,24 +49,3 @@ def test_canonicalization_nparray() -> None:
 def test_canonicalization_bad(coords) -> None:
     with pytest.raises(TypeError):
         somacore.AxisQuery(coords=coords)
-
-
-@attrs.define(frozen=True)
-class IHaveObsVarStuff:
-    obs: int
-    var: int
-    the_obs_suf: str
-    the_var_suf: str
-
-
-def test_axis_helpers() -> None:
-    thing = IHaveObsVarStuff(obs=1, var=2, the_obs_suf="observe", the_var_suf="vary")
-    assert 1 == query._Axis.OBS.getattr_from(thing)
-    assert 2 == query._Axis.VAR.getattr_from(thing)
-    assert "observe" == query._Axis.OBS.getattr_from(thing, pre="the_", suf="_suf")
-    assert "vary" == query._Axis.VAR.getattr_from(thing, pre="the_", suf="_suf")
-    ovdict = {"obs": "erve", "var": "y", "i_obscure": "hide", "i_varcure": "???"}
-    assert "erve" == query._Axis.OBS.getitem_from(ovdict)
-    assert "y" == query._Axis.VAR.getitem_from(ovdict)
-    assert "hide" == query._Axis.OBS.getitem_from(ovdict, pre="i_", suf="cure")
-    assert "???" == query._Axis.VAR.getitem_from(ovdict, pre="i_", suf="cure")