From 861f2243d94250b22d0f778b05ed36fcb2b67cbc Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 6 Nov 2024 14:18:31 -0500 Subject: [PATCH 1/4] make `{experiment,query}.py` classes abstract --- python-spec/src/somacore/experiment.py | 37 +- python-spec/src/somacore/query/__init__.py | 2 + python-spec/src/somacore/query/query.py | 723 ++------------------- python-spec/src/somacore/query/types.py | 10 +- python-spec/testing/test_query_axis.py | 23 - 5 files changed, 77 insertions(+), 718 deletions(-) diff --git a/python-spec/src/somacore/experiment.py b/python-spec/src/somacore/experiment.py index fbdcb517..e9100c7a 100644 --- a/python-spec/src/somacore/experiment.py +++ b/python-spec/src/somacore/experiment.py @@ -1,6 +1,7 @@ +import abc from typing import Generic, Optional, TypeVar -from typing_extensions import Final, Self +from typing_extensions import Final from . import _mixin from . import base @@ -9,6 +10,7 @@ from . import measurement from . import query from . import scene +from .query import ExperimentAxisQuery _DF = TypeVar("_DF", bound=data.DataFrame) """An implementation of a DataFrame.""" @@ -20,8 +22,10 @@ """The root SOMA object type of the implementation.""" -class Experiment( - collection.BaseCollection[_RootSO], Generic[_DF, _MeasColl, _SceneColl, _RootSO] +class Experiment( # type: ignore[misc] # __eq__ false positive + collection.BaseCollection[_RootSO], + Generic[_DF, _MeasColl, _SceneColl, _RootSO], + abc.ABC, ): """A collection subtype representing an annotated 2D matrix of measurements. @@ -33,22 +37,6 @@ class Experiment( Lifecycle: maturing """ - # This class is implemented as a mixin to be used with SOMA classes. - # For example, a SOMA implementation would look like this: - # - # # This type-ignore comment will always be needed due to limitations - # # of type annotations; it is (currently) expected. - # class Experiment( # type: ignore[type-var] - # ImplBaseCollection[ImplSOMAObject], - # somacore.Experiment[ - # ImplDataFrame, # _DF - # ImplMeasurement, # _MeasColl - # ImplScene, # _SceneColl - # ImplSOMAObject, # _RootSO - # ], - # ): - # ... - __slots__ = () soma_type: Final = "SOMAExperiment" # type: ignore[misc] @@ -83,18 +71,11 @@ def axis_query( *, obs_query: Optional[query.AxisQuery] = None, var_query: Optional[query.AxisQuery] = None, - ) -> "query.ExperimentAxisQuery[Self]": + ) -> ExperimentAxisQuery: """Creates an axis query over this experiment. See :class:`query.ExperimentAxisQuery` for details on usage. Lifecycle: maturing """ - # mypy doesn't quite understand descriptors so it issues a spurious - # error here. - return query.ExperimentAxisQuery( # type: ignore[type-var] - self, - measurement_name, - obs_query=obs_query or query.AxisQuery(), - var_query=var_query or query.AxisQuery(), - ) + raise NotImplementedError diff --git a/python-spec/src/somacore/query/__init__.py b/python-spec/src/somacore/query/__init__.py index 41598d9a..808b7048 100644 --- a/python-spec/src/somacore/query/__init__.py +++ b/python-spec/src/somacore/query/__init__.py @@ -3,10 +3,12 @@ ExperimentAxisQuery = query.ExperimentAxisQuery AxisColumnNames = query.AxisColumnNames +AxisIndexer = query.AxisIndexer AxisQuery = axis.AxisQuery __all__ = ( "ExperimentAxisQuery", "AxisColumnNames", + "AxisIndexer", "AxisQuery", ) diff --git a/python-spec/src/somacore/query/query.py b/python-spec/src/somacore/query/query.py index a42e1909..0231fe1c 100644 --- a/python-spec/src/somacore/query/query.py +++ b/python-spec/src/somacore/query/query.py @@ -1,39 +1,29 @@ -import enum -from concurrent import futures from typing import ( Any, - Callable, - Dict, - Generic, Mapping, Optional, Sequence, - Tuple, - TypeVar, Union, - cast, - overload, ) -import anndata -import attrs import numpy as np import numpy.typing as npt -import pandas as pd import pyarrow as pa -import pyarrow.compute as pacomp -from scipy import sparse -from typing_extensions import Literal, Protocol, Self, TypedDict +from anndata import AnnData +from typing_extensions import Protocol, Self, TypedDict -from .. import data +from .. import DataFrame +from .. import ReadIter +from .. import SparseRead from .. import measurement -from .. import options from .. import types as base_types -from . import _fast_csr -from . import axis -from . import types +from ..options import BatchSize +from ..options import PlatformConfig +from ..options import ReadPartitions +from ..options import ResultOrder +from ..options import ResultOrderStr -_RO_AUTO = options.ResultOrder.AUTO +_RO_AUTO = ResultOrder.AUTO class AxisColumnNames(TypedDict, total=False): @@ -49,11 +39,7 @@ class AxisColumnNames(TypedDict, total=False): """var columns to use. All columns if ``None`` or not present.""" -_Exp = TypeVar("_Exp", bound="_Experimentish") -"""TypeVar for the concrete type of an experiment-like object.""" - - -class ExperimentAxisQuery(Generic[_Exp]): +class ExperimentAxisQuery: """Axis-based query against a SOMA Experiment. ExperimentAxisQuery allows easy selection and extraction of data from a @@ -64,121 +50,56 @@ class ExperimentAxisQuery(Generic[_Exp]): var value and/or coordinates. Slicing on :class:`SparseNDArray` ``X`` matrices is supported; :class:`DenseNDArray` is not supported at this time. - IMPORTANT: this class is not thread-safe. - - IMPORTANT: this query class assumes it can store the full result of both - axis dataframe queries in memory, and only provides incremental access to - the underlying X NDArray. API features such as ``n_obs`` and ``n_vars`` - codify this in the API. - - IMPORTANT: you must call ``close()`` on any instance of this class to - release underlying resources. The ExperimentAxisQuery is a context manager, - and it is recommended that you use the following pattern to make this easy - and safe:: - - with ExperimentAxisQuery(...) as query: - ... - - This base query implementation is designed to work against any SOMA - implementation that fulfills the basic APIs. A SOMA implementation may - include a custom query implementation optimized for its own use. - Lifecycle: maturing """ - def __init__( - self, - experiment: _Exp, - measurement_name: str, - *, - obs_query: axis.AxisQuery = axis.AxisQuery(), - var_query: axis.AxisQuery = axis.AxisQuery(), - index_factory: types.IndexFactory = pd.Index, - ): - if measurement_name not in experiment.ms: - raise ValueError("Measurement does not exist in the experiment") - - # Users often like to pass `foo=None` and we should let them - obs_query = obs_query or axis.AxisQuery() - var_query = var_query or axis.AxisQuery() - - self.experiment = experiment - self.measurement_name = measurement_name - - self._matrix_axis_query = _MatrixAxisQuery(obs=obs_query, var=var_query) - self._joinids = _JoinIDCache(self) - self._indexer = AxisIndexer( - self, - index_factory=index_factory, - ) - self._index_factory = index_factory - self._threadpool_: Optional[futures.ThreadPoolExecutor] = None - def obs( self, *, column_names: Optional[Sequence[str]] = None, - batch_size: options.BatchSize = options.BatchSize(), - partitions: Optional[options.ReadPartitions] = None, - result_order: options.ResultOrderStr = _RO_AUTO, - platform_config: Optional[options.PlatformConfig] = None, - ) -> data.ReadIter[pa.Table]: + batch_size: BatchSize = BatchSize(), + partitions: Optional[ReadPartitions] = None, + result_order: ResultOrderStr = _RO_AUTO, + platform_config: Optional[PlatformConfig] = None, + ) -> ReadIter[pa.Table]: """Returns ``obs`` as an `Arrow table `_ iterator. Lifecycle: maturing """ - obs_query = self._matrix_axis_query.obs - return self._obs_df.read( - obs_query.coords, - value_filter=obs_query.value_filter, - column_names=column_names, - batch_size=batch_size, - partitions=partitions, - result_order=result_order, - platform_config=platform_config, - ) + raise NotImplementedError def var( self, *, column_names: Optional[Sequence[str]] = None, - batch_size: options.BatchSize = options.BatchSize(), - partitions: Optional[options.ReadPartitions] = None, - result_order: options.ResultOrderStr = _RO_AUTO, - platform_config: Optional[options.PlatformConfig] = None, - ) -> data.ReadIter[pa.Table]: + batch_size: BatchSize = BatchSize(), + partitions: Optional[ReadPartitions] = None, + result_order: ResultOrderStr = _RO_AUTO, + platform_config: Optional[PlatformConfig] = None, + ) -> ReadIter[pa.Table]: """Returns ``var`` as an `Arrow table `_ iterator. Lifecycle: maturing """ - var_query = self._matrix_axis_query.var - return self._var_df.read( - var_query.coords, - value_filter=var_query.value_filter, - column_names=column_names, - batch_size=batch_size, - partitions=partitions, - result_order=result_order, - platform_config=platform_config, - ) + raise NotImplementedError def obs_joinids(self) -> pa.IntegerArray: """Returns ``obs`` ``soma_joinids`` as an Arrow array. Lifecycle: maturing """ - return self._joinids.obs + raise NotImplementedError def var_joinids(self) -> pa.IntegerArray: """Returns ``var`` ``soma_joinids`` as an Arrow array. Lifecycle: maturing """ - return self._joinids.var + raise NotImplementedError @property def n_obs(self) -> int: @@ -186,7 +107,7 @@ def n_obs(self) -> int: Lifecycle: maturing """ - return len(self.obs_joinids()) + raise NotImplementedError @property def n_vars(self) -> int: @@ -194,7 +115,7 @@ def n_vars(self) -> int: Lifecycle: maturing """ - return len(self.var_joinids()) + raise NotImplementedError @property def indexer(self) -> "AxisIndexer": @@ -202,73 +123,59 @@ def indexer(self) -> "AxisIndexer": Lifecycle: maturing """ - return self._indexer + raise NotImplementedError def X( self, layer_name: str, *, - batch_size: options.BatchSize = options.BatchSize(), - partitions: Optional[options.ReadPartitions] = None, - result_order: options.ResultOrderStr = _RO_AUTO, - platform_config: Optional[options.PlatformConfig] = None, - ) -> data.SparseRead: + batch_size: BatchSize = BatchSize(), + partitions: Optional[ReadPartitions] = None, + result_order: ResultOrderStr = _RO_AUTO, + platform_config: Optional[PlatformConfig] = None, + ) -> SparseRead: """Returns an ``X`` layer as a sparse read. Args: layer_name: The X layer name to return. batch_size: The size of batches that should be returned from a read. - See :class:`options.BatchSize` for details. + See :class:`BatchSize` for details. partitions: Specifies that this is part of a partitioned read, and which partition to include, if present. result_order: the order to return results, specified as a - :class:`~options.ResultOrder` or its string value. + :class:`~ResultOrder` or its string value. platform_config: platform-specific configuration; keys are SOMA implementation names. Lifecycle: maturing """ - try: - x_layer = self._ms.X[layer_name] - except KeyError as ke: - raise KeyError(f"{layer_name} is not present in X") from ke - if not isinstance(x_layer, data.SparseNDArray): - raise TypeError("X layers may only be sparse arrays") - - self._joinids.preload(self._threadpool) - return x_layer.read( - (self._joinids.obs, self._joinids.var), - batch_size=batch_size, - partitions=partitions, - result_order=result_order, - platform_config=platform_config, - ) - - def obsp(self, layer: str) -> data.SparseRead: + raise NotImplementedError + + def obsp(self, layer: str) -> SparseRead: """Returns an ``obsp`` layer as a sparse read. Lifecycle: maturing """ - return self._axisp_inner(_Axis.OBS, layer) + raise NotImplementedError - def varp(self, layer: str) -> data.SparseRead: + def varp(self, layer: str) -> SparseRead: """Returns a ``varp`` layer as a sparse read. Lifecycle: maturing """ - return self._axisp_inner(_Axis.VAR, layer) + raise NotImplementedError - def obsm(self, layer: str) -> data.SparseRead: + def obsm(self, layer: str) -> SparseRead: """Returns an ``obsm`` layer as a sparse read. Lifecycle: maturing """ - return self._axism_inner(_Axis.OBS, layer) + raise NotImplementedError - def varm(self, layer: str) -> data.SparseRead: + def varm(self, layer: str) -> SparseRead: """Returns a ``varm`` layer as a sparse read. Lifecycle: maturing """ - return self._axism_inner(_Axis.VAR, layer) + raise NotImplementedError def obs_scene_ids(self) -> pa.Array: """Returns a pyarrow array with scene ids that contain obs from this @@ -276,20 +183,7 @@ def obs_scene_ids(self) -> pa.Array: Lifecycle: experimental """ - try: - obs_scene = self.experiment.obs_spatial_presence - except KeyError as ke: - raise KeyError("Missing obs_scene") from ke - if not isinstance(obs_scene, data.DataFrame): - raise TypeError("obs_scene must be a dataframe.") - - full_table = obs_scene.read( - coords=((_Axis.OBS.getattr_from(self._joinids), slice(None))), - result_order=options.ResultOrder.COLUMN_MAJOR, - value_filter="data != 0", - ).concat() - - return pacomp.unique(full_table["scene_id"]) + raise NotImplementedError def var_scene_ids(self) -> pa.Array: """Return a pyarrow array with scene ids that contain var from this @@ -297,20 +191,7 @@ def var_scene_ids(self) -> pa.Array: Lifecycle: experimental """ - try: - var_scene = self._ms.var_spatial_presence - except KeyError as ke: - raise KeyError("Missing var_scene") from ke - if not isinstance(var_scene, data.DataFrame): - raise TypeError("var_scene must be a dataframe.") - - full_table = var_scene.read( - coords=((_Axis.OBS.getattr_from(self._joinids), slice(None))), - result_order=options.ResultOrder.COLUMN_MAJOR, - value_filter="data != 0", - ).concat() - - return pacomp.unique(full_table["scene_id"]) + raise NotImplementedError def to_anndata( self, @@ -323,7 +204,7 @@ def to_anndata( varm_layers: Sequence[str] = (), varp_layers: Sequence[str] = (), drop_levels: bool = False, - ) -> anndata.AnnData: + ) -> AnnData: """ Executes the query and return result as an ``AnnData`` in-memory object. @@ -349,26 +230,7 @@ def to_anndata( Lifecycle: maturing """ - ad = self._read( - X_name, - column_names=column_names or AxisColumnNames(obs=None, var=None), - X_layers=X_layers, - obsm_layers=obsm_layers, - obsp_layers=obsp_layers, - varm_layers=varm_layers, - varp_layers=varp_layers, - ).to_anndata() - - # Drop unused categories on axis dataframes if requested - if drop_levels: - for name in ad.obs: - if ad.obs[name].dtype.name == "category": - ad.obs[name] = ad.obs[name].cat.remove_unused_categories() - for name in ad.var: - if ad.var[name].dtype.name == "category": - ad.var[name] = ad.var[name].cat.remove_unused_categories() - - return ad + raise NotImplementedError # Context management @@ -379,436 +241,19 @@ def close(self) -> None: Lifecycle: maturing """ - # Because this may be called during ``__del__`` when we might be getting - # disassembled, sometimes ``_threadpool_`` is simply missing. - # Only try to shut it down if it still exists. - pool = getattr(self, "_threadpool_", None) - if pool is None: - return - pool.shutdown() - self._threadpool_ = None + raise NotImplementedError def __enter__(self) -> Self: - return self + raise NotImplementedError def __exit__(self, *_: Any) -> None: - self.close() - - def __del__(self) -> None: - """Ensure that we're closed when our last ref disappears.""" - self.close() - # If any superclass in our MRO has a __del__, call it. - sdel = getattr(super(), "__del__", lambda: None) - sdel() - - # Internals - - def _read( - self, - X_name: str, - *, - column_names: AxisColumnNames, - X_layers: Sequence[str], - obsm_layers: Sequence[str] = (), - obsp_layers: Sequence[str] = (), - varm_layers: Sequence[str] = (), - varp_layers: Sequence[str] = (), - ) -> "_AxisQueryResult": - """Reads the entire query result in memory. - - This is a low-level routine intended to be used by loaders for other - in-core formats, such as AnnData, which can be created from the - resulting objects. - - Args: - X_name: The X layer to read and return in the ``X`` slot. - column_names: The columns in the ``var`` and ``obs`` dataframes - to read. - X_layers: Additional X layers to read and return - in the ``layers`` slot. - obsm_layers: - Additional obsm layers to read and return in the obsm slot. - obsp_layers: - Additional obsp layers to read and return in the obsp slot. - varm_layers: - Additional varm layers to read and return in the varm slot. - varp_layers: - Additional varp layers to read and return in the varp slot. - """ - x_collection = self._ms.X - all_x_names = [X_name] + list(X_layers) - all_x_arrays: Dict[str, data.SparseNDArray] = {} - for _xname in all_x_names: - if not isinstance(_xname, str) or not _xname: - raise ValueError("X layer names must be specified as a string.") - if _xname not in x_collection: - raise ValueError("Unknown X layer name") - x_array = x_collection[_xname] - if not isinstance(x_array, data.SparseNDArray): - raise NotImplementedError("Dense array unsupported") - all_x_arrays[_xname] = x_array - - def _read_axis_mappings(fn, axis, keys: Sequence[str]) -> Dict[str, np.ndarray]: - return {key: fn(axis, key) for key in keys} - - obsm_ft = self._threadpool.submit( - _read_axis_mappings, self._axism_inner_ndarray, _Axis.OBS, obsm_layers - ) - obsp_ft = self._threadpool.submit( - _read_axis_mappings, self._axisp_inner_ndarray, _Axis.OBS, obsp_layers - ) - varm_ft = self._threadpool.submit( - _read_axis_mappings, self._axism_inner_ndarray, _Axis.VAR, varm_layers - ) - varp_ft = self._threadpool.submit( - _read_axis_mappings, self._axisp_inner_ndarray, _Axis.VAR, varp_layers - ) - - obs_table, var_table = self._read_both_axes(column_names) - - x_matrices = { - _xname: _fast_csr.read_csr( - all_x_arrays[_xname], - self.obs_joinids(), - self.var_joinids(), - index_factory=self._index_factory, - ).to_scipy() - for _xname in all_x_arrays - } - - x = x_matrices.pop(X_name) - - obs = obs_table.to_pandas() - obs.index = obs.index.astype(str) - - var = var_table.to_pandas() - var.index = var.index.astype(str) - - return _AxisQueryResult( - obs=obs, - var=var, - X=x, - obsm=obsm_ft.result(), - obsp=obsp_ft.result(), - varm=varm_ft.result(), - varp=varp_ft.result(), - X_layers=x_matrices, - ) - - def _read_both_axes( - self, - column_names: AxisColumnNames, - ) -> Tuple[pa.Table, pa.Table]: - """Reads both axes in their entirety, ensuring soma_joinid is retained.""" - obs_ft = self._threadpool.submit( - self._read_axis_dataframe, - _Axis.OBS, - column_names, - ) - var_ft = self._threadpool.submit( - self._read_axis_dataframe, - _Axis.VAR, - column_names, - ) - return obs_ft.result(), var_ft.result() - - def _read_axis_dataframe( - self, - axis: "_Axis", - axis_column_names: AxisColumnNames, - ) -> pa.Table: - """Reads the specified axis. Will cache join IDs if not present.""" - column_names = axis_column_names.get(axis.value) - - axis_df = axis.getattr_from(self, pre="_", suf="_df") - assert isinstance(axis_df, data.DataFrame) - axis_query = axis.getattr_from(self._matrix_axis_query) - - # If we can cache join IDs, prepare to add them to the cache. - joinids_cached = self._joinids._is_cached(axis) - query_columns = column_names - added_soma_joinid_to_columns = False - if ( - not joinids_cached - and column_names is not None - and "soma_joinid" not in column_names - ): - # If we want to fill the join ID cache, ensure that we query the - # soma_joinid column so that it is included in the results. - # We'll filter it out later. - query_columns = ["soma_joinid"] + list(column_names) - added_soma_joinid_to_columns = True - - # Do the actual query. - arrow_table = axis_df.read( - coords=axis_query.coords, - value_filter=axis_query.value_filter, - column_names=query_columns, - ).concat() - - # Update the cache if needed. We can do this because no matter what - # other columns are queried for, the contents of the ``soma_joinid`` - # column will be the same and can be safely stored. - if not joinids_cached: - setattr( - self._joinids, - axis.value, - arrow_table.column("soma_joinid").combine_chunks(), - ) - - # Drop soma_joinid column if we added it solely for use in filling - # the joinid cache. - if added_soma_joinid_to_columns: - arrow_table = arrow_table.drop(["soma_joinid"]) - return arrow_table - - def _axisp_inner( - self, - axis: "_Axis", - layer: str, - ) -> data.SparseRead: - p_name = f"{axis.value}p" - try: - axisp = axis.getitem_from(self._ms, suf="p") - except KeyError as ke: - raise ValueError(f"Measurement does not contain {p_name} data") from ke - - try: - ap_layer = axisp[layer] - except KeyError as ke: - raise ValueError(f"layer {layer!r} is not available in {p_name}") from ke - if not isinstance(ap_layer, data.SparseNDArray): - raise TypeError( - f"Unexpected SOMA type {type(ap_layer).__name__}" - f" stored in {p_name} layer {layer!r}" - ) - - joinids = axis.getattr_from(self._joinids) - return ap_layer.read((joinids, joinids)) - - def _axism_inner( - self, - axis: "_Axis", - layer: str, - ) -> data.SparseRead: - m_name = f"{axis.value}m" - - try: - axism = axis.getitem_from(self._ms, suf="m") - except KeyError: - raise ValueError(f"Measurement does not contain {m_name} data") from None - - try: - axism_layer = axism[layer] - except KeyError as ke: - raise ValueError(f"layer {layer!r} is not available in {m_name}") from ke - - if not isinstance(axism_layer, data.SparseNDArray): - raise TypeError(f"Unexpected SOMA type stored in '{m_name}' layer") - - joinids = axis.getattr_from(self._joinids) - return axism_layer.read((joinids, slice(None))) - - def _convert_to_ndarray( - self, axis: "_Axis", table: pa.Table, n_row: int, n_col: int - ) -> np.ndarray: - indexer = cast( - Callable[[_Numpyable], npt.NDArray[np.intp]], - axis.getattr_from(self.indexer, pre="by_"), - ) - idx = indexer(table["soma_dim_0"]) - z: np.ndarray = np.zeros(n_row * n_col, dtype=np.float32) - np.put(z, idx * n_col + table["soma_dim_1"], table["soma_data"]) - return z.reshape(n_row, n_col) - - def _axisp_inner_ndarray( - self, - axis: "_Axis", - layer: str, - ) -> np.ndarray: - n_row = n_col = len(axis.getattr_from(self._joinids)) - - table = self._axisp_inner(axis, layer).tables().concat() - return self._convert_to_ndarray(axis, table, n_row, n_col) - - def _axism_inner_ndarray( - self, - axis: "_Axis", - layer: str, - ) -> np.ndarray: - table = self._axism_inner(axis, layer).tables().concat() - - n_row = len(axis.getattr_from(self._joinids)) - n_col = len(table["soma_dim_1"].unique()) + raise NotImplementedError - return self._convert_to_ndarray(axis, table, n_row, n_col) - @property - def _obs_df(self) -> data.DataFrame: - return self.experiment.obs - - @property - def _ms(self) -> measurement.Measurement: - return self.experiment.ms[self.measurement_name] - - @property - def _var_df(self) -> data.DataFrame: - return self._ms.var - - @property - def _threadpool(self) -> futures.ThreadPoolExecutor: - """ - Returns the threadpool provided by the experiment's context. - If not available, creates a thread pool just in time.""" - context = self.experiment.context - if context and context.threadpool: - return context.threadpool - - if self._threadpool_ is None: - self._threadpool_ = futures.ThreadPoolExecutor() - return self._threadpool_ - - -# Private internal data structures - - -@attrs.define(frozen=True) -class _AxisQueryResult: - """The result of running :meth:`ExperimentAxisQuery.read`. Private.""" - - obs: pd.DataFrame - """Experiment.obs query slice, as a pandas DataFrame""" - var: pd.DataFrame - """Experiment.ms[...].var query slice, as a pandas DataFrame""" - X: sparse.csr_matrix - """Experiment.ms[...].X[...] query slice, as a SciPy sparse.csr_matrix """ - X_layers: Dict[str, sparse.csr_matrix] = attrs.field(factory=dict) - """Any additional X layers requested, as SciPy sparse.csr_matrix(s)""" - obsm: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.obsm query slice, as a numpy ndarray""" - obsp: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.obsp query slice, as a numpy ndarray""" - varm: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.varm query slice, as a numpy ndarray""" - varp: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.varp query slice, as a numpy ndarray""" - - def to_anndata(self) -> anndata.AnnData: - return anndata.AnnData( - X=self.X, - obs=self.obs, - var=self.var, - obsm=(self.obsm or None), - obsp=(self.obsp or None), - varm=(self.varm or None), - varp=(self.varp or None), - layers=(self.X_layers or None), - ) - - -class _Axis(enum.Enum): - OBS = "obs" - VAR = "var" - - @property - def value(self) -> Literal["obs", "var"]: - return super().value - - @overload - def getattr_from(self, __source: "_HasObsVar[_T]") -> "_T": ... - - @overload - def getattr_from( - self, __source: Any, *, pre: Literal[""], suf: Literal[""] - ) -> object: ... - - @overload - def getattr_from( - self, __source: Any, *, pre: str = ..., suf: str = ... - ) -> object: ... - - def getattr_from(self, __source: Any, *, pre: str = "", suf: str = "") -> object: - """Equivalent to ``something.
``."""
-        return getattr(__source, pre + self.value + suf)
-
-    def getitem_from(
-        self, __source: Mapping[str, "_T"], *, pre: str = "", suf: str = ""
-    ) -> "_T":
-        """Equivalent to ``something[pre + "obs"/"var" + suf]``."""
-        return __source[pre + self.value + suf]
-
-
-@attrs.define(frozen=True)
-class _MatrixAxisQuery:
-    """The per-axis user query definition. Private."""
-
-    obs: axis.AxisQuery
-    var: axis.AxisQuery
-
-
-@attrs.define
-class _JoinIDCache:
-    """A cache for per-axis join ids in the query. Private."""
-
-    owner: ExperimentAxisQuery
-
-    _cached_obs: Optional[pa.IntegerArray] = None
-    _cached_var: Optional[pa.IntegerArray] = None
-
-    def _is_cached(self, axis: _Axis) -> bool:
-        field = "_cached_" + axis.value
-        return getattr(self, field) is not None
-
-    def preload(self, pool: futures.ThreadPoolExecutor) -> None:
-        if self._cached_obs is not None and self._cached_var is not None:
-            return
-        obs_ft = pool.submit(lambda: self.obs)
-        var_ft = pool.submit(lambda: self.var)
-        # Wait for them and raise in case of error.
-        obs_ft.result()
-        var_ft.result()
-
-    @property
-    def obs(self) -> pa.IntegerArray:
-        """Join IDs for the obs axis. Will load and cache if not already."""
-        if not self._cached_obs:
-            self._cached_obs = _load_joinids(
-                self.owner._obs_df, self.owner._matrix_axis_query.obs
-            )
-        return self._cached_obs
-
-    @obs.setter
-    def obs(self, val: pa.IntegerArray) -> None:
-        self._cached_obs = val
-
-    @property
-    def var(self) -> pa.IntegerArray:
-        """Join IDs for the var axis. Will load and cache if not already."""
-        if not self._cached_var:
-            self._cached_var = _load_joinids(
-                self.owner._var_df, self.owner._matrix_axis_query.var
-            )
-        return self._cached_var
-
-    @var.setter
-    def var(self, val: pa.IntegerArray) -> None:
-        self._cached_var = val
-
-
-def _load_joinids(df: data.DataFrame, axq: axis.AxisQuery) -> pa.IntegerArray:
-    tbl = df.read(
-        axq.coords,
-        value_filter=axq.value_filter,
-        column_names=["soma_joinid"],
-    ).concat()
-    return tbl.column("soma_joinid").combine_chunks()
-
-
-_Numpyable = Union[pa.Array, pa.ChunkedArray, npt.NDArray[np.int64]]
+Numpyable = Union[pa.Array, pa.ChunkedArray, npt.NDArray[np.int64]]
 """Things that can be converted to a NumPy array."""
 
 
-@attrs.define
 class AxisIndexer:
     """
     Given a query, provides index-building services for obs/var axis.
@@ -816,72 +261,26 @@ class AxisIndexer:
     Lifecycle: maturing
     """
 
-    query: ExperimentAxisQuery
-    _index_factory: types.IndexFactory
-    _cached_obs: Optional[types.IndexLike] = None
-    _cached_var: Optional[types.IndexLike] = None
-
-    @property
-    def _obs_index(self) -> types.IndexLike:
-        """Private. Return an index for the ``obs`` axis."""
-        if self._cached_obs is None:
-            self._cached_obs = self._index_factory(self.query.obs_joinids().to_numpy())
-        return self._cached_obs
-
-    @property
-    def _var_index(self) -> types.IndexLike:
-        """Private. Return an index for the ``var`` axis."""
-        if self._cached_var is None:
-            self._cached_var = self._index_factory(self.query.var_joinids().to_numpy())
-        return self._cached_var
-
-    def by_obs(self, coords: _Numpyable) -> npt.NDArray[np.intp]:
+    def by_obs(self, coords: Numpyable) -> npt.NDArray[np.intp]:
         """Reindex the coords (soma_joinids) over the ``obs`` axis."""
-        return self._obs_index.get_indexer(_to_numpy(coords))
+        raise NotImplementedError
 
-    def by_var(self, coords: _Numpyable) -> npt.NDArray[np.intp]:
+    def by_var(self, coords: Numpyable) -> npt.NDArray[np.intp]:
         """Reindex for the coords (soma_joinids) over the ``var`` axis."""
-        return self._var_index.get_indexer(_to_numpy(coords))
-
+        raise NotImplementedError
 
-def _to_numpy(it: _Numpyable) -> np.ndarray:
-    if isinstance(it, np.ndarray):
-        return it
-    return it.to_numpy()
 
-
-#
-# Type shenanigans
-#
-
-_T = TypeVar("_T")
-_T_co = TypeVar("_T_co", covariant=True)
-
-
-class _Experimentish(Protocol):
+class Experimentish(Protocol):
     """The API we need from an Experiment."""
 
     @property
     def ms(self) -> Mapping[str, measurement.Measurement]: ...
 
     @property
-    def obs(self) -> data.DataFrame: ...
+    def obs(self) -> DataFrame: ...
 
     @property
     def context(self) -> Optional[base_types.ContextBase]: ...
 
     @property
-    def obs_spatial_presence(self) -> data.DataFrame: ...
-
-
-class _HasObsVar(Protocol[_T_co]):
-    """Something which has an ``obs`` and ``var`` field.
-
-    Used to give nicer type inference in :meth:`_Axis.getattr_from`.
-    """
-
-    @property
-    def obs(self) -> _T_co: ...
-
-    @property
-    def var(self) -> _T_co: ...
+    def obs_spatial_presence(self) -> DataFrame: ...
diff --git a/python-spec/src/somacore/query/types.py b/python-spec/src/somacore/query/types.py
index f5caa439..e8de994b 100644
--- a/python-spec/src/somacore/query/types.py
+++ b/python-spec/src/somacore/query/types.py
@@ -1,13 +1,13 @@
 """Common types used across SOMA query modules."""
 
-from typing import Any, Callable, Union
+from typing import Callable, Union
 
 import numpy as np
 import numpy.typing as npt
 import pyarrow as pa
 from typing_extensions import Protocol
 
-_IntegerArray = Union[npt.NDArray[np.int64], pa.IntegerArray]
+IntegerArray = Union[npt.NDArray[np.int64], pa.IntegerArray]
 
 
 class IndexLike(Protocol):
@@ -19,12 +19,12 @@ class IndexLike(Protocol):
     not as a full specification of the types and behavior of ``get_indexer``.
     """
 
-    def get_indexer(self, target: _IntegerArray) -> Any:
+    def get_indexer(self, target: IntegerArray) -> npt.NDArray[np.intp]:
         """Something compatible with Pandas' Index.get_indexer method."""
 
 
-IndexFactory = Callable[[_IntegerArray], "IndexLike"]
-"""Function that builds an index over the given NDArray.
+IndexFactory = Callable[[IntegerArray], IndexLike]
+"""Function that builds an index over the given ``IntegerArray``.
 
 This interface is implemented by the callable ``pandas.Index``.
 """
diff --git a/python-spec/testing/test_query_axis.py b/python-spec/testing/test_query_axis.py
index e9d418f9..a235698d 100644
--- a/python-spec/testing/test_query_axis.py
+++ b/python-spec/testing/test_query_axis.py
@@ -1,13 +1,11 @@
 from typing import Any, Tuple
 
-import attrs
 import numpy as np
 import pytest
 from pytest import mark
 
 import somacore
 from somacore import options
-from somacore.query import query
 
 
 @mark.parametrize(
@@ -51,24 +49,3 @@ def test_canonicalization_nparray() -> None:
 def test_canonicalization_bad(coords) -> None:
     with pytest.raises(TypeError):
         somacore.AxisQuery(coords=coords)
-
-
-@attrs.define(frozen=True)
-class IHaveObsVarStuff:
-    obs: int
-    var: int
-    the_obs_suf: str
-    the_var_suf: str
-
-
-def test_axis_helpers() -> None:
-    thing = IHaveObsVarStuff(obs=1, var=2, the_obs_suf="observe", the_var_suf="vary")
-    assert 1 == query._Axis.OBS.getattr_from(thing)
-    assert 2 == query._Axis.VAR.getattr_from(thing)
-    assert "observe" == query._Axis.OBS.getattr_from(thing, pre="the_", suf="_suf")
-    assert "vary" == query._Axis.VAR.getattr_from(thing, pre="the_", suf="_suf")
-    ovdict = {"obs": "erve", "var": "y", "i_obscure": "hide", "i_varcure": "???"}
-    assert "erve" == query._Axis.OBS.getitem_from(ovdict)
-    assert "y" == query._Axis.VAR.getitem_from(ovdict)
-    assert "hide" == query._Axis.OBS.getitem_from(ovdict, pre="i_", suf="cure")
-    assert "???" == query._Axis.VAR.getitem_from(ovdict, pre="i_", suf="cure")

From 2b183cb69d3df9a6b097f926edb36abac74c4c6f Mon Sep 17 00:00:00 2001
From: Ryan Williams 
Date: Thu, 7 Nov 2024 14:13:12 -0500
Subject: [PATCH 2/4] `rm _{{test_,}eager_iter,fast_csr}.py` (moved to
 TileDB-SOMA)

---
 pyproject.toml                                |   1 -
 python-spec/requirements-py3.10.txt           |   1 -
 python-spec/requirements-py3.11.txt           |   1 -
 python-spec/requirements-py3.12.txt           |   1 -
 python-spec/requirements-py3.9.txt            |   1 -
 python-spec/src/somacore/query/_eager_iter.py |  51 ----
 python-spec/src/somacore/query/_fast_csr.py   | 275 ------------------
 python-spec/testing/test_eager_iter.py        |  64 ----
 8 files changed, 395 deletions(-)
 delete mode 100644 python-spec/src/somacore/query/_eager_iter.py
 delete mode 100644 python-spec/src/somacore/query/_fast_csr.py
 delete mode 100644 python-spec/testing/test_eager_iter.py

diff --git a/pyproject.toml b/pyproject.toml
index b3f31e2a..7e07880e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -13,7 +13,6 @@ readme = "./python-spec/README.md"
 dependencies = [
   "anndata",
   "attrs>=22.1",
-  "numba",
   "numpy>=1.21",
   "pandas",
   "pyarrow",
diff --git a/python-spec/requirements-py3.10.txt b/python-spec/requirements-py3.10.txt
index c2fd0fe3..951c0cd9 100644
--- a/python-spec/requirements-py3.10.txt
+++ b/python-spec/requirements-py3.10.txt
@@ -5,7 +5,6 @@ exceptiongroup==1.2.1
 h5py==3.11.0
 llvmlite==0.43.0
 natsort==8.4.0
-numba==0.60.0
 numpy==2.0.0
 packaging==24.1
 pandas==2.2.2
diff --git a/python-spec/requirements-py3.11.txt b/python-spec/requirements-py3.11.txt
index 665528ff..0dd6c8ad 100644
--- a/python-spec/requirements-py3.11.txt
+++ b/python-spec/requirements-py3.11.txt
@@ -4,7 +4,6 @@ attrs==23.2.0
 h5py==3.11.0
 llvmlite==0.43.0
 natsort==8.4.0
-numba==0.60.0
 numpy==2.0.0
 packaging==24.1
 pandas==2.2.2
diff --git a/python-spec/requirements-py3.12.txt b/python-spec/requirements-py3.12.txt
index 0fe2f050..1d0585c5 100644
--- a/python-spec/requirements-py3.12.txt
+++ b/python-spec/requirements-py3.12.txt
@@ -4,7 +4,6 @@ attrs==23.2.0
 h5py==3.11.0
 llvmlite==0.43.0
 natsort==8.4.0
-numba==0.60.0
 numpy==2.0.0
 packaging==24.1
 pandas==2.2.2
diff --git a/python-spec/requirements-py3.9.txt b/python-spec/requirements-py3.9.txt
index db2c51ae..fc763a1c 100644
--- a/python-spec/requirements-py3.9.txt
+++ b/python-spec/requirements-py3.9.txt
@@ -7,7 +7,6 @@ h5py==3.11.0
 jmespath==1.0.1
 llvmlite==0.43.0
 natsort==8.4.0
-numba==0.60.0
 numpy==2.0.0
 packaging==24.1
 pandas==2.2.2
diff --git a/python-spec/src/somacore/query/_eager_iter.py b/python-spec/src/somacore/query/_eager_iter.py
deleted file mode 100644
index c42e52c1..00000000
--- a/python-spec/src/somacore/query/_eager_iter.py
+++ /dev/null
@@ -1,51 +0,0 @@
-from concurrent import futures
-from typing import Iterator, Optional, TypeVar
-
-_T = TypeVar("_T")
-
-
-class EagerIterator(Iterator[_T]):
-    def __init__(
-        self,
-        iterator: Iterator[_T],
-        pool: Optional[futures.Executor] = None,
-    ):
-        super().__init__()
-        self.iterator = iterator
-        self._pool = pool or futures.ThreadPoolExecutor()
-        self._own_pool = pool is None
-        self._preload_future = self._pool.submit(self.iterator.__next__)
-
-    def __next__(self) -> _T:
-        stopped = False
-        try:
-            if self._preload_future.cancel():
-                # If `.cancel` returns True, cancellation was successful.
-                # The self.iterator.__next__ call has not yet been started,
-                # and will never be started, so we can compute next ourselves.
-                # This prevents deadlocks if the thread pool is too small
-                # and we can never create a preload thread.
-                return next(self.iterator)
-            # `.cancel` returned false, so the preload is already running.
-            # Just wait for it.
-            return self._preload_future.result()
-        except StopIteration:
-            self._cleanup()
-            stopped = True
-            raise
-        finally:
-            if not stopped:
-                # If we have more to do, go for the next thing.
-                self._preload_future = self._pool.submit(self.iterator.__next__)
-
-    def _cleanup(self) -> None:
-        if self._own_pool:
-            self._pool.shutdown()
-
-    def __del__(self) -> None:
-        # Ensure the threadpool is cleaned up in the case where the
-        # iterator is not exhausted. For more information on __del__:
-        # https://docs.python.org/3/reference/datamodel.html#object.__del__
-        self._cleanup()
-        super_del = getattr(super(), "__del__", lambda: None)
-        super_del()
diff --git a/python-spec/src/somacore/query/_fast_csr.py b/python-spec/src/somacore/query/_fast_csr.py
deleted file mode 100644
index 99a784d4..00000000
--- a/python-spec/src/somacore/query/_fast_csr.py
+++ /dev/null
@@ -1,275 +0,0 @@
-import os
-from concurrent import futures
-from typing import List, NamedTuple, Tuple, Type, cast
-
-import numba
-import numba.typed
-import numpy as np
-import numpy.typing as npt
-import pyarrow as pa
-from scipy import sparse
-
-from .. import data as scd
-from . import _eager_iter
-from . import types
-
-
-def read_csr(
-    matrix: scd.SparseNDArray,
-    obs_joinids: pa.IntegerArray,
-    var_joinids: pa.IntegerArray,
-    index_factory: types.IndexFactory,
-) -> "AccumulatedCSR":
-    if not isinstance(matrix, scd.SparseNDArray) or matrix.ndim != 2:
-        raise TypeError("Can only read from a 2D SparseNDArray")
-
-    max_workers = (os.cpu_count() or 4) + 2
-    with futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
-        acc = _CSRAccumulator(
-            obs_joinids=obs_joinids,
-            var_joinids=var_joinids,
-            pool=pool,
-            index_factory=index_factory,
-        )
-        for tbl in _eager_iter.EagerIterator(
-            matrix.read((obs_joinids, var_joinids)).tables(),
-            pool=pool,
-        ):
-            acc.append(tbl["soma_dim_0"], tbl["soma_dim_1"], tbl["soma_data"])
-
-        return acc.finalize()
-
-
-class AccumulatedCSR(NamedTuple):
-    """
-    Private.
-
-    Return type for the _CSRAccumulator.finalize method.
-    Contains a sparse CSR's constituent elements.
-    """
-
-    data: npt.NDArray[np.number]
-    indptr: npt.NDArray[np.integer]
-    indices: npt.NDArray[np.integer]
-    shape: Tuple[int, int]
-
-    def to_scipy(self) -> sparse.csr_matrix:
-        """Create a Scipy sparse.csr_matrix from component elements.
-
-        Conceptually, this is identical to::
-
-            sparse.csr_matrix((data, indices, indptr), shape=shape)
-
-        This ugliness is to bypass the O(N) scan that
-        :meth:`sparse._cs_matrix.__init__`
-        does when a new compressed matrix is created.
-
-        See `SciPy bug 11496 `
-        for details.
-        """
-        matrix = sparse.csr_matrix.__new__(sparse.csr_matrix)
-        matrix.data = self.data
-        matrix.indptr = self.indptr
-        matrix.indices = self.indices
-        matrix._shape = self.shape
-        return matrix
-
-
-class _CSRAccumulator:
-    """
-    Fast accumulator of a CSR, based upon COO input.
-    """
-
-    def __init__(
-        self,
-        obs_joinids: pa.IntegerArray,
-        var_joinids: pa.IntegerArray,
-        pool: futures.Executor,
-        index_factory: types.IndexFactory,
-    ):
-        self.obs_joinids = obs_joinids
-        self.var_joinids = var_joinids
-        self.pool = pool
-
-        self.shape: Tuple[int, int] = (len(self.obs_joinids), len(self.var_joinids))
-        self.obs_indexer = index_factory(self.obs_joinids)
-        self.var_indexer = index_factory(self.var_joinids)
-        self.row_length: npt.NDArray[np.int64] = np.zeros(
-            (self.shape[0],), dtype=_select_dtype(self.shape[1])
-        )
-
-        # COO accumulated chunks, stored as list of triples (row_ind, col_ind, data)
-        self.coo_chunks: List[
-            Tuple[
-                npt.NDArray[np.integer],  # row_ind
-                npt.NDArray[np.integer],  # col_ind
-                npt.NDArray[np.number],  # data
-            ]
-        ] = []
-
-    def append(
-        self,
-        row_joinids: pa.Array,
-        col_joinids: pa.Array,
-        data: pa.Array,
-    ) -> None:
-        """
-        At accumulation time, do several things:
-
-        * re-index to positional indices, and if possible, cast to smaller dtype
-          to minimize memory footprint (at cost of some amount of time)
-        * accumulate column counts by row, i.e., build the basis of the indptr
-        * cache the tuple of data, row, col
-        """
-        rows_future = self.pool.submit(
-            _reindex_and_cast,
-            self.obs_indexer,
-            row_joinids.to_numpy(),
-            _select_dtype(self.shape[0]),
-        )
-        cols_future = self.pool.submit(
-            _reindex_and_cast,
-            self.var_indexer,
-            col_joinids.to_numpy(),
-            _select_dtype(self.shape[1]),
-        )
-        row_ind = rows_future.result()
-        col_ind = cols_future.result()
-        self.coo_chunks.append((row_ind, col_ind, data.to_numpy()))
-        _accum_row_length(self.row_length, row_ind)
-
-    def finalize(self) -> AccumulatedCSR:
-        nnz = sum(len(chunk[2]) for chunk in self.coo_chunks)
-        index_dtype = _select_dtype(nnz)
-        if nnz == 0:
-            # There is no way to infer matrix dtype, so use a default and return
-            # an empty matrix. Float32 is used as a default type, as it is most
-            # compatible with AnnData expectations.
-            empty = sparse.csr_matrix((0, 0), dtype=np.float32)
-            return AccumulatedCSR(
-                data=empty.data,
-                indptr=empty.indptr,
-                indices=empty.indices,
-                shape=self.shape,
-            )
-
-        # cumsum row lengths to get indptr
-        indptr = np.empty((self.shape[0] + 1,), dtype=index_dtype)
-        indptr[0:1] = 0
-        np.cumsum(self.row_length, out=indptr[1:])
-
-        # Parallel copy of data and column indices
-        indices = np.empty((nnz,), dtype=index_dtype)
-        data = np.empty((nnz,), dtype=self.coo_chunks[0][2].dtype)
-
-        # Empirically determined value. Needs to be large enough for reasonable
-        # concurrency, without excessive write cache conflict. Controls the
-        # number of rows that are processed in a single thread, and therefore
-        # is the primary tuning parameter related to concurrency.
-        row_rng_mask_bits = 18
-
-        n_jobs = (self.shape[0] >> row_rng_mask_bits) + 1
-        chunk_list = numba.typed.List(self.coo_chunks)
-        futures.wait(
-            [
-                self.pool.submit(
-                    _copy_chunklist_range,
-                    chunk_list,
-                    data,
-                    indices,
-                    indptr,
-                    row_rng_mask_bits,
-                    job,
-                )
-                for job in range(n_jobs)
-            ]
-        )
-        _finalize_indptr(indptr)
-        return AccumulatedCSR(
-            data=data, indptr=indptr, indices=indices, shape=self.shape
-        )
-
-
-@numba.jit(nopython=True, nogil=True)  # type: ignore[attr-defined]
-def _accum_row_length(
-    row_length: npt.NDArray[np.int64], row_ind: npt.NDArray[np.int64]
-) -> None:
-    for rind in row_ind:
-        row_length[rind] += 1
-
-
-@numba.jit(nopython=True, nogil=True)  # type: ignore[attr-defined]
-def _copy_chunk_range(
-    row_ind_chunk: npt.NDArray[np.signedinteger],
-    col_ind_chunk: npt.NDArray[np.signedinteger],
-    data_chunk: npt.NDArray[np.number],
-    data: npt.NDArray[np.number],
-    indices: npt.NDArray[np.signedinteger],
-    indptr: npt.NDArray[np.signedinteger],
-    row_rng_mask: int,
-    row_rng_val: int,
-):
-    for n in range(len(data_chunk)):
-        row = row_ind_chunk[n]
-        if (row & row_rng_mask) != row_rng_val:
-            continue
-        ptr = indptr[row]
-        indices[ptr] = col_ind_chunk[n]
-        data[ptr] = data_chunk[n]
-        indptr[row] += 1
-
-
-@numba.jit(nopython=True, nogil=True)  # type: ignore[attr-defined]
-def _copy_chunklist_range(
-    chunk_list: numba.typed.List,
-    data: npt.NDArray[np.number],
-    indices: npt.NDArray[np.signedinteger],
-    indptr: npt.NDArray[np.signedinteger],
-    row_rng_mask_bits: int,
-    job: int,
-):
-    assert row_rng_mask_bits >= 1 and row_rng_mask_bits < 64
-    row_rng_mask = (2**64 - 1) >> row_rng_mask_bits << row_rng_mask_bits
-    row_rng_val = job << row_rng_mask_bits
-    for row_ind_chunk, col_ind_chunk, data_chunk in chunk_list:
-        _copy_chunk_range(
-            row_ind_chunk,
-            col_ind_chunk,
-            data_chunk,
-            data,
-            indices,
-            indptr,
-            row_rng_mask,
-            row_rng_val,
-        )
-
-
-@numba.jit(nopython=True, nogil=True)  # type: ignore[attr-defined]
-def _finalize_indptr(indptr: npt.NDArray[np.signedinteger]):
-    prev = 0
-    for r in range(len(indptr)):
-        t = indptr[r]
-        indptr[r] = prev
-        prev = t
-
-
-def _select_dtype(
-    maxval: int,
-) -> Type[np.signedinteger]:
-    """
-    Ascertain the "best" dtype for a zero-based index. Given our
-    goal of minimizing memory use, "best" is currently defined as
-    smallest.
-    """
-    if maxval > np.iinfo(np.int32).max:
-        return np.int64
-    else:
-        return np.int32
-
-
-def _reindex_and_cast(
-    index: types.IndexLike, ids: npt.NDArray[np.int64], target_dtype: npt.DTypeLike
-) -> npt.NDArray[np.int64]:
-    return cast(
-        npt.NDArray[np.int64], index.get_indexer(ids).astype(target_dtype, copy=False)
-    )
diff --git a/python-spec/testing/test_eager_iter.py b/python-spec/testing/test_eager_iter.py
deleted file mode 100644
index 87f74b9c..00000000
--- a/python-spec/testing/test_eager_iter.py
+++ /dev/null
@@ -1,64 +0,0 @@
-import threading
-import unittest
-from concurrent import futures
-from unittest import mock
-
-from somacore.query import _eager_iter
-
-
-class EagerIterTest(unittest.TestCase):
-    def setUp(self):
-        super().setUp()
-        self.kiddie_pool = futures.ThreadPoolExecutor(1)
-        """Tiny thread pool for testing."""
-        self.verify_pool = futures.ThreadPoolExecutor(1)
-        """Separate thread pool so verification is not blocked."""
-
-    def tearDown(self):
-        self.verify_pool.shutdown(wait=False)
-        self.kiddie_pool.shutdown(wait=False)
-        super().tearDown()
-
-    def test_thread_starvation(self):
-        sem = threading.Semaphore()
-        try:
-            # Monopolize the threadpool.
-            sem.acquire()
-            self.kiddie_pool.submit(sem.acquire)
-            eager = _eager_iter.EagerIterator(iter("abc"), pool=self.kiddie_pool)
-            got_a = self.verify_pool.submit(lambda: next(eager))
-            self.assertEqual("a", got_a.result(0.1))
-            got_b = self.verify_pool.submit(lambda: next(eager))
-            self.assertEqual("b", got_b.result(0.1))
-            got_c = self.verify_pool.submit(lambda: next(eager))
-            self.assertEqual("c", got_c.result(0.1))
-            with self.assertRaises(StopIteration):
-                self.verify_pool.submit(lambda: next(eager)).result(0.1)
-        finally:
-            sem.release()
-
-    def test_nesting(self):
-        inner = _eager_iter.EagerIterator(iter("abc"), pool=self.kiddie_pool)
-        outer = _eager_iter.EagerIterator(inner, pool=self.kiddie_pool)
-        self.assertEqual(
-            "a, b, c", self.verify_pool.submit(", ".join, outer).result(0.1)
-        )
-
-    def test_exceptions(self):
-        flaky = mock.MagicMock()
-        flaky.__next__.side_effect = [1, 2, ValueError(), 3, 4]
-
-        eager_flaky = _eager_iter.EagerIterator(flaky, pool=self.kiddie_pool)
-        got_1 = self.verify_pool.submit(lambda: next(eager_flaky))
-        self.assertEqual(1, got_1.result(0.1))
-        got_2 = self.verify_pool.submit(lambda: next(eager_flaky))
-        self.assertEqual(2, got_2.result(0.1))
-        with self.assertRaises(ValueError):
-            self.verify_pool.submit(lambda: next(eager_flaky)).result(0.1)
-        got_3 = self.verify_pool.submit(lambda: next(eager_flaky))
-        self.assertEqual(3, got_3.result(0.1))
-        got_4 = self.verify_pool.submit(lambda: next(eager_flaky))
-        self.assertEqual(4, got_4.result(0.1))
-        for _ in range(5):
-            with self.assertRaises(StopIteration):
-                self.verify_pool.submit(lambda: next(eager_flaky)).result(0.1)

From 55788e7394d797722324255d1791103e593dfa0d Mon Sep 17 00:00:00 2001
From: Ryan Williams 
Date: Fri, 15 Nov 2024 13:55:21 -0500
Subject: [PATCH 3/4] `ExperimentAxisQuery`, `AxisIndexer` are explicitly
 `ABC`s

---
 python-spec/src/somacore/experiment.py  |  6 ++-
 python-spec/src/somacore/query/query.py | 68 ++++++++++++++++---------
 2 files changed, 48 insertions(+), 26 deletions(-)

diff --git a/python-spec/src/somacore/experiment.py b/python-spec/src/somacore/experiment.py
index e9100c7a..8869ffa7 100644
--- a/python-spec/src/somacore/experiment.py
+++ b/python-spec/src/somacore/experiment.py
@@ -1,4 +1,5 @@
-import abc
+from abc import ABC
+from abc import abstractmethod
 from typing import Generic, Optional, TypeVar
 
 from typing_extensions import Final
@@ -25,7 +26,7 @@
 class Experiment(  # type: ignore[misc]  # __eq__ false positive
     collection.BaseCollection[_RootSO],
     Generic[_DF, _MeasColl, _SceneColl, _RootSO],
-    abc.ABC,
+    ABC,
 ):
     """A collection subtype representing an annotated 2D matrix of measurements.
 
@@ -65,6 +66,7 @@ class Experiment(  # type: ignore[misc]  # __eq__ false positive
     ``scene_id`` and ``False`` otherwise.
     """
 
+    @abstractmethod
     def axis_query(
         self,
         measurement_name: str,
diff --git a/python-spec/src/somacore/query/query.py b/python-spec/src/somacore/query/query.py
index 0231fe1c..51e01efb 100644
--- a/python-spec/src/somacore/query/query.py
+++ b/python-spec/src/somacore/query/query.py
@@ -1,3 +1,5 @@
+from abc import ABC
+from abc import abstractmethod
 from typing import (
     Any,
     Mapping,
@@ -39,7 +41,7 @@ class AxisColumnNames(TypedDict, total=False):
     """var columns to use. All columns if ``None`` or not present."""
 
 
-class ExperimentAxisQuery:
+class ExperimentAxisQuery(ABC):
     """Axis-based query against a SOMA Experiment.
 
     ExperimentAxisQuery allows easy selection and extraction of data from a
@@ -53,6 +55,7 @@ class ExperimentAxisQuery:
     Lifecycle: maturing
     """
 
+    @abstractmethod
     def obs(
         self,
         *,
@@ -68,8 +71,9 @@ def obs(
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def var(
         self,
         *,
@@ -85,46 +89,52 @@ def var(
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def obs_joinids(self) -> pa.IntegerArray:
         """Returns ``obs`` ``soma_joinids`` as an Arrow array.
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def var_joinids(self) -> pa.IntegerArray:
         """Returns ``var`` ``soma_joinids`` as an Arrow array.
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
     @property
+    @abstractmethod
     def n_obs(self) -> int:
         """The number of ``obs`` axis query results.
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
     @property
+    @abstractmethod
     def n_vars(self) -> int:
         """The number of ``var`` axis query results.
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
     @property
+    @abstractmethod
     def indexer(self) -> "AxisIndexer":
         """A ``soma_joinid`` indexer for both ``obs`` and ``var`` axes.
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def X(
         self,
         layer_name: str,
@@ -149,50 +159,57 @@ def X(
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def obsp(self, layer: str) -> SparseRead:
         """Returns an ``obsp`` layer as a sparse read.
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def varp(self, layer: str) -> SparseRead:
         """Returns a ``varp`` layer as a sparse read.
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def obsm(self, layer: str) -> SparseRead:
         """Returns an ``obsm`` layer as a sparse read.
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def varm(self, layer: str) -> SparseRead:
         """Returns a ``varm`` layer as a sparse read.
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def obs_scene_ids(self) -> pa.Array:
         """Returns a pyarrow array with scene ids that contain obs from this
         query.
 
         Lifecycle: experimental
         """
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def var_scene_ids(self) -> pa.Array:
         """Return a pyarrow array with scene ids that contain var from this
         query.
 
         Lifecycle: experimental
         """
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def to_anndata(
         self,
         X_name: str,
@@ -230,10 +247,11 @@ def to_anndata(
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
     # Context management
 
+    @abstractmethod
     def close(self) -> None:
         """Releases resources associated with this query.
 
@@ -241,33 +259,35 @@ def close(self) -> None:
 
         Lifecycle: maturing
         """
-        raise NotImplementedError
+        ...
 
-    def __enter__(self) -> Self:
-        raise NotImplementedError
+    @abstractmethod
+    def __enter__(self) -> Self: ...
 
-    def __exit__(self, *_: Any) -> None:
-        raise NotImplementedError
+    @abstractmethod
+    def __exit__(self, *_: Any) -> None: ...
 
 
 Numpyable = Union[pa.Array, pa.ChunkedArray, npt.NDArray[np.int64]]
 """Things that can be converted to a NumPy array."""
 
 
-class AxisIndexer:
+class AxisIndexer(ABC):
     """
     Given a query, provides index-building services for obs/var axis.
 
     Lifecycle: maturing
     """
 
+    @abstractmethod
     def by_obs(self, coords: Numpyable) -> npt.NDArray[np.intp]:
         """Reindex the coords (soma_joinids) over the ``obs`` axis."""
-        raise NotImplementedError
+        ...
 
+    @abstractmethod
     def by_var(self, coords: Numpyable) -> npt.NDArray[np.intp]:
         """Reindex for the coords (soma_joinids) over the ``var`` axis."""
-        raise NotImplementedError
+        ...
 
 
 class Experimentish(Protocol):

From 89c44d717689aa44ce0ec1be0d46db1aec69b0c7 Mon Sep 17 00:00:00 2001
From: Ryan Williams 
Date: Fri, 15 Nov 2024 15:00:28 -0500
Subject: [PATCH 4/4] rm `somacore.ephemeral` (and associated tests)

---
 .../src/somacore/ephemeral/__init__.py        |  18 --
 .../src/somacore/ephemeral/collections.py     | 234 ------------------
 python-spec/testing/test_collection.py        |  44 ----
 python-spec/testing/test_mixin.py             |  39 ---
 4 files changed, 335 deletions(-)
 delete mode 100644 python-spec/src/somacore/ephemeral/__init__.py
 delete mode 100644 python-spec/src/somacore/ephemeral/collections.py
 delete mode 100644 python-spec/testing/test_collection.py
 delete mode 100644 python-spec/testing/test_mixin.py

diff --git a/python-spec/src/somacore/ephemeral/__init__.py b/python-spec/src/somacore/ephemeral/__init__.py
deleted file mode 100644
index 932f63f1..00000000
--- a/python-spec/src/somacore/ephemeral/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-"""In-memory-only implementations of SOMA types.
-
-These are meant for testing and exploration, where a user may want to do an
-ad-hoc analysis of multiple datasets without having to create a stored
-Collection.
-"""
-
-from .collections import Collection
-from .collections import Experiment
-from .collections import Measurement
-from .collections import Scene
-
-__all__ = (
-    "Collection",
-    "Experiment",
-    "Measurement",
-    "Scene",
-)
diff --git a/python-spec/src/somacore/ephemeral/collections.py b/python-spec/src/somacore/ephemeral/collections.py
deleted file mode 100644
index 68a224d1..00000000
--- a/python-spec/src/somacore/ephemeral/collections.py
+++ /dev/null
@@ -1,234 +0,0 @@
-from typing import (
-    Any,
-    Dict,
-    Iterator,
-    NoReturn,
-    Optional,
-    TypeVar,
-)
-
-from typing_extensions import Literal, Self
-
-from .. import base
-from .. import collection
-from .. import coordinates
-from .. import data
-from .. import experiment
-from .. import measurement
-from .. import options
-from .. import scene
-from .. import spatial
-
-_Elem = TypeVar("_Elem", bound=base.SOMAObject)
-
-
-class BaseCollection(collection.BaseCollection[_Elem]):
-    """A memory-backed SOMA Collection for ad-hoc collection building.
-
-    This Collection implementation exists purely in memory. It can be used to
-    build ad-hoc SOMA Collections for one-off analyses, and to combine SOMA
-    datasets from different sources that cannot be added to a Collection that
-    is represented in storage.
-
-    Entries added to this Collection are not "owned" by the collection; their
-    lifecycle is still dictated by the place they were opened from. This
-    collection has no ``context`` and ``close``ing it does nothing.
-    """
-
-    __slots__ = ("_entries", "_metadata")
-
-    def __init__(self, *args: Any, **kwargs: _Elem):
-        """Creates a new Collection.
-
-        Arguments and kwargs are provided as in the ``dict`` constructor.
-        """
-        self._entries: Dict[str, _Elem] = dict(*args, **kwargs)
-        self._metadata: Dict[str, Any] = {}
-
-    @property
-    def uri(self) -> str:
-        return f"somacore:ephemeral-collection:{id(self):x}"
-
-    @property
-    def metadata(self) -> Dict[str, Any]:
-        return self._metadata
-
-    @classmethod
-    def open(cls, *args, **kwargs) -> NoReturn:
-        del args, kwargs  # All unused
-        raise TypeError(
-            "Ephemeral collections are in-memory only and cannot be opened."
-        )
-
-    @classmethod
-    def exists(cls, uri: str, *, context: Any = None) -> Literal[False]:
-        del uri, context  # All unused.
-        # Ephemeral collections are in-memory only and do not otherwise exist.
-        return False
-
-    @classmethod
-    def create(cls, *args, **kwargs) -> Self:
-        del args, kwargs  # All unused
-        # ThisCollection is in-memory only, so just return a new empty one.
-        return cls()
-
-    def add_new_collection(self, *args, **kwargs) -> NoReturn:
-        del args, kwargs  # All unused
-        # TODO: Should we be willing to create Collection-based child elements,
-        # like Measurement and Experiment?
-        raise TypeError(
-            "An ephemeral Collection cannot create its own children;"
-            " only existing SOMA objects may be added."
-        )
-
-    add_new_dataframe = add_new_collection
-    add_new_sparse_ndarray = add_new_collection
-    add_new_dense_ndarray = add_new_collection
-
-    @property
-    def closed(self) -> bool:
-        return False  # With no backing storage, there is nothing to close.
-
-    @property
-    def mode(self) -> options.OpenMode:
-        return "w"  # This collection is always writable.
-
-    def set(
-        self, key: str, value: _Elem, *, use_relative_uri: Optional[bool] = None
-    ) -> Self:
-        del use_relative_uri  # Ignored.
-        self._entries[key] = value
-        return self
-
-    def __getitem__(self, key: str) -> _Elem:
-        return self._entries[key]
-
-    def __delitem__(self, key: str) -> None:
-        del self._entries[key]
-
-    def __iter__(self) -> Iterator[str]:
-        return iter(self._entries)
-
-    def __len__(self) -> int:
-        return len(self._entries)
-
-
-class Collection(  # type: ignore[misc]  # __eq__ false positive
-    BaseCollection[_Elem], collection.Collection
-):
-    """An in-memory Collection imposing no semantics on the contents."""
-
-    __slots__ = ()
-
-
-_BasicAbstractMeasurement = measurement.Measurement[
-    data.DataFrame,
-    collection.Collection[data.NDArray],
-    collection.Collection[data.DenseNDArray],
-    collection.Collection[data.SparseNDArray],
-    base.SOMAObject,
-]
-"""The loosest possible constraint of the abstract Measurement type."""
-
-_BasicAbstractScene = scene.Scene[
-    spatial.MultiscaleImage,
-    spatial.PointCloudDataFrame,
-    spatial.GeometryDataFrame,
-    base.SOMAObject,
-]
-"""The loosest possible constraint of the abstract Scene type."""
-
-
-class Measurement(  # type: ignore[misc]  # __eq__ false positive
-    BaseCollection[base.SOMAObject], _BasicAbstractMeasurement
-):
-    """An in-memory Collection with Measurement semantics."""
-
-    __slots__ = ()
-
-
-class Scene(  # type: ignore[misc]   # __eq__ false positive
-    BaseCollection[base.SOMAObject], _BasicAbstractScene
-):
-    """An in-memory Collection with Scene semantics."""
-
-    __slots__ = ()
-
-    @property
-    def coordinate_space(self) -> coordinates.CoordinateSpace:
-        """Coordinate system for this scene."""
-        raise NotImplementedError()
-
-    @coordinate_space.setter
-    def coordinate_space(self, value: coordinates.CoordinateSpace) -> None:
-        raise NotImplementedError()
-
-    def add_new_geometry_dataframe(self, *args, **kwargs) -> spatial.GeometryDataFrame:
-        raise NotImplementedError()
-
-    def add_new_multiscale_image(self, *args, **kwargs) -> spatial.MultiscaleImage:
-        raise NotImplementedError()
-
-    def add_new_point_cloud_dataframe(
-        self, *args, **kwargs
-    ) -> spatial.PointCloudDataFrame:
-        raise NotImplementedError()
-
-    def set_transform_to_geometry_dataframe(
-        self, *args, **kwargs
-    ) -> spatial.GeometryDataFrame:
-        raise NotImplementedError()
-
-    def set_transform_to_multiscale_image(
-        self, *args, **kwargs
-    ) -> spatial.MultiscaleImage:
-        raise NotImplementedError()
-
-    def set_transform_to_point_cloud_dataframe(
-        self, *args, **kwargs
-    ) -> spatial.PointCloudDataFrame:
-        raise NotImplementedError()
-
-    def get_transform_from_geometry_dataframe(
-        self, *args, **kwargs
-    ) -> coordinates.CoordinateTransform:
-        raise NotImplementedError()
-
-    def get_transform_from_multiscale_image(
-        self, *args, **kwargs
-    ) -> coordinates.CoordinateTransform:
-        raise NotImplementedError()
-
-    def get_transform_from_point_cloud_dataframe(
-        self, *args, **kwargs
-    ) -> coordinates.CoordinateTransform:
-        raise NotImplementedError()
-
-    def get_transform_to_geometry_dataframe(
-        self, *args, **kwargs
-    ) -> coordinates.CoordinateTransform:
-        raise NotImplementedError()
-
-    def get_transform_to_multiscale_image(
-        self, *args, **kwargs
-    ) -> coordinates.CoordinateTransform:
-        raise NotImplementedError()
-
-    def get_transform_to_point_cloud_dataframe(
-        self, *args, **kwargs
-    ) -> coordinates.CoordinateTransform:
-        raise NotImplementedError()
-
-
-class Experiment(  # type: ignore[misc]  # __eq__ false positive
-    BaseCollection[base.SOMAObject],
-    experiment.Experiment[
-        data.DataFrame,
-        collection.Collection[_BasicAbstractMeasurement],
-        collection.Collection[_BasicAbstractScene],
-        base.SOMAObject,
-    ],
-):
-    """An in-memory Collection with Experiment semantics."""
-
-    __slots__ = ()
diff --git a/python-spec/testing/test_collection.py b/python-spec/testing/test_collection.py
deleted file mode 100644
index e997c6d2..00000000
--- a/python-spec/testing/test_collection.py
+++ /dev/null
@@ -1,44 +0,0 @@
-import unittest
-from typing import Any
-
-from somacore import ephemeral
-
-
-class EphemeralCollectionTest(unittest.TestCase):
-    def test_basic(self):
-        # Since the ephemeral Collection implementation is straightforward,
-        # this is just to ensure that we actually fulfill everything.
-
-        coll = ephemeral.Collection[Any]()
-        entry_a = ephemeral.Collection[Any]()
-        coll["a"] = entry_a
-        self.assertIs(entry_a, coll["a"])
-        del coll["a"]
-
-        md = coll.metadata
-
-        md["hello"] = "world"
-
-        self.assertEqual("world", coll.metadata["hello"])
-
-    def test_equality_identity(self):
-        # Ensures that only object identity is used to compare SOMA objects,
-        # and nothing else.
-        # If these were any other Mapping type, they would be `__eq__` here,
-        # since they both have the same (i.e., no) elements.
-        coll = ephemeral.Collection[Any]()
-        coll_2 = ephemeral.Collection[Any]()
-        self.assertNotEqual(coll, coll_2)
-        both = frozenset((coll, coll_2))
-        self.assertIn(coll, both)
-        self.assertIn(coll_2, both)
-
-    def test_method_resolution_order(self):
-        # Ensures that constant definitions interact correctly with the MRO.
-
-        m = ephemeral.Measurement()
-        self.assertEqual("SOMAMeasurement", m.soma_type)
-        exp = ephemeral.Experiment()
-        self.assertEqual("SOMAExperiment", exp.soma_type)
-        scene = ephemeral.Scene()
-        self.assertEqual("SOMAScene", scene.soma_type)
diff --git a/python-spec/testing/test_mixin.py b/python-spec/testing/test_mixin.py
deleted file mode 100644
index 6da3b2ad..00000000
--- a/python-spec/testing/test_mixin.py
+++ /dev/null
@@ -1,39 +0,0 @@
-import unittest
-
-from somacore import _mixin
-from somacore import ephemeral
-
-
-class TestItem(unittest.TestCase):
-    def test_get(self):
-        the_a = _mixin.item(str)
-
-        class ItemHaver(ephemeral.Collection):
-            a = the_a
-            b = _mixin.item(int, "base_b")
-
-        self.assertIs(the_a, ItemHaver.a)
-
-        items = ItemHaver()
-        items["c"] = "d"
-
-        with self.assertRaises(AttributeError):
-            items.a
-        with self.assertRaises(AttributeError):
-            items.b
-        with self.assertRaises(AttributeError):
-            del items.a
-
-        self.assertEqual("d", items["c"])
-
-        items["a"] = "a"
-        items["base_b"] = 1
-        self.assertEqual("a", items.a)
-        self.assertEqual(1, items.b)
-
-        items.a = "hello"
-        items.b = 500
-
-        self.assertEqual(dict(a="hello", base_b=500, c="d"), dict(items))
-        del items.a
-        self.assertEqual(dict(base_b=500, c="d"), dict(items))