From 9e81f0754217ea8ea9d7a69bce5a8f07bf46e89d Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 6 Nov 2024 14:18:31 -0500 Subject: [PATCH] make `{experiment,query}.py` classes abstract --- python-spec/src/somacore/data.py | 2 + python-spec/src/somacore/experiment.py | 37 +- python-spec/src/somacore/query/__init__.py | 2 + python-spec/src/somacore/query/query.py | 725 ++------------------- python-spec/src/somacore/query/types.py | 10 +- python-spec/testing/test_query_axis.py | 23 - 6 files changed, 81 insertions(+), 718 deletions(-) diff --git a/python-spec/src/somacore/data.py b/python-spec/src/somacore/data.py index e0220c7e..0d278249 100644 --- a/python-spec/src/somacore/data.py +++ b/python-spec/src/somacore/data.py @@ -479,6 +479,8 @@ def write( Arrow table: a COO table, with columns named ``soma_dim_0``, ..., ``soma_dim_N`` and ``soma_data``. + platform_config: platform-specific configuration; keys are SOMA + implementation names. Returns: ``self``, to enable method chaining. diff --git a/python-spec/src/somacore/experiment.py b/python-spec/src/somacore/experiment.py index fbdcb517..e9100c7a 100644 --- a/python-spec/src/somacore/experiment.py +++ b/python-spec/src/somacore/experiment.py @@ -1,6 +1,7 @@ +import abc from typing import Generic, Optional, TypeVar -from typing_extensions import Final, Self +from typing_extensions import Final from . import _mixin from . import base @@ -9,6 +10,7 @@ from . import measurement from . import query from . import scene +from .query import ExperimentAxisQuery _DF = TypeVar("_DF", bound=data.DataFrame) """An implementation of a DataFrame.""" @@ -20,8 +22,10 @@ """The root SOMA object type of the implementation.""" -class Experiment( - collection.BaseCollection[_RootSO], Generic[_DF, _MeasColl, _SceneColl, _RootSO] +class Experiment( # type: ignore[misc] # __eq__ false positive + collection.BaseCollection[_RootSO], + Generic[_DF, _MeasColl, _SceneColl, _RootSO], + abc.ABC, ): """A collection subtype representing an annotated 2D matrix of measurements. @@ -33,22 +37,6 @@ class Experiment( Lifecycle: maturing """ - # This class is implemented as a mixin to be used with SOMA classes. - # For example, a SOMA implementation would look like this: - # - # # This type-ignore comment will always be needed due to limitations - # # of type annotations; it is (currently) expected. - # class Experiment( # type: ignore[type-var] - # ImplBaseCollection[ImplSOMAObject], - # somacore.Experiment[ - # ImplDataFrame, # _DF - # ImplMeasurement, # _MeasColl - # ImplScene, # _SceneColl - # ImplSOMAObject, # _RootSO - # ], - # ): - # ... - __slots__ = () soma_type: Final = "SOMAExperiment" # type: ignore[misc] @@ -83,18 +71,11 @@ def axis_query( *, obs_query: Optional[query.AxisQuery] = None, var_query: Optional[query.AxisQuery] = None, - ) -> "query.ExperimentAxisQuery[Self]": + ) -> ExperimentAxisQuery: """Creates an axis query over this experiment. See :class:`query.ExperimentAxisQuery` for details on usage. Lifecycle: maturing """ - # mypy doesn't quite understand descriptors so it issues a spurious - # error here. - return query.ExperimentAxisQuery( # type: ignore[type-var] - self, - measurement_name, - obs_query=obs_query or query.AxisQuery(), - var_query=var_query or query.AxisQuery(), - ) + raise NotImplementedError diff --git a/python-spec/src/somacore/query/__init__.py b/python-spec/src/somacore/query/__init__.py index 41598d9a..808b7048 100644 --- a/python-spec/src/somacore/query/__init__.py +++ b/python-spec/src/somacore/query/__init__.py @@ -3,10 +3,12 @@ ExperimentAxisQuery = query.ExperimentAxisQuery AxisColumnNames = query.AxisColumnNames +AxisIndexer = query.AxisIndexer AxisQuery = axis.AxisQuery __all__ = ( "ExperimentAxisQuery", "AxisColumnNames", + "AxisIndexer", "AxisQuery", ) diff --git a/python-spec/src/somacore/query/query.py b/python-spec/src/somacore/query/query.py index 08d6b6f0..0231fe1c 100644 --- a/python-spec/src/somacore/query/query.py +++ b/python-spec/src/somacore/query/query.py @@ -1,39 +1,29 @@ -import enum -from concurrent import futures from typing import ( Any, - Callable, - Dict, - Generic, Mapping, Optional, Sequence, - Tuple, - TypeVar, Union, - cast, - overload, ) -import anndata -import attrs import numpy as np import numpy.typing as npt -import pandas as pd import pyarrow as pa -import pyarrow.compute as pacomp -from scipy import sparse -from typing_extensions import Literal, Protocol, Self, TypedDict +from anndata import AnnData +from typing_extensions import Protocol, Self, TypedDict -from .. import data +from .. import DataFrame +from .. import ReadIter +from .. import SparseRead from .. import measurement -from .. import options from .. import types as base_types -from . import _fast_csr -from . import axis -from . import types +from ..options import BatchSize +from ..options import PlatformConfig +from ..options import ReadPartitions +from ..options import ResultOrder +from ..options import ResultOrderStr -_RO_AUTO = options.ResultOrder.AUTO +_RO_AUTO = ResultOrder.AUTO class AxisColumnNames(TypedDict, total=False): @@ -49,11 +39,7 @@ class AxisColumnNames(TypedDict, total=False): """var columns to use. All columns if ``None`` or not present.""" -_Exp = TypeVar("_Exp", bound="_Experimentish") -"""TypeVar for the concrete type of an experiment-like object.""" - - -class ExperimentAxisQuery(Generic[_Exp]): +class ExperimentAxisQuery: """Axis-based query against a SOMA Experiment. ExperimentAxisQuery allows easy selection and extraction of data from a @@ -64,121 +50,56 @@ class ExperimentAxisQuery(Generic[_Exp]): var value and/or coordinates. Slicing on :class:`SparseNDArray` ``X`` matrices is supported; :class:`DenseNDArray` is not supported at this time. - IMPORTANT: this class is not thread-safe. - - IMPORTANT: this query class assumes it can store the full result of both - axis dataframe queries in memory, and only provides incremental access to - the underlying X NDArray. API features such as ``n_obs`` and ``n_vars`` - codify this in the API. - - IMPORTANT: you must call ``close()`` on any instance of this class to - release underlying resources. The ExperimentAxisQuery is a context manager, - and it is recommended that you use the following pattern to make this easy - and safe:: - - with ExperimentAxisQuery(...) as query: - ... - - This base query implementation is designed to work against any SOMA - implementation that fulfills the basic APIs. A SOMA implementation may - include a custom query implementation optimized for its own use. - Lifecycle: maturing """ - def __init__( - self, - experiment: _Exp, - measurement_name: str, - *, - obs_query: axis.AxisQuery = axis.AxisQuery(), - var_query: axis.AxisQuery = axis.AxisQuery(), - index_factory: types.IndexFactory = pd.Index, - ): - if measurement_name not in experiment.ms: - raise ValueError("Measurement does not exist in the experiment") - - # Users often like to pass `foo=None` and we should let them - obs_query = obs_query or axis.AxisQuery() - var_query = var_query or axis.AxisQuery() - - self.experiment = experiment - self.measurement_name = measurement_name - - self._matrix_axis_query = _MatrixAxisQuery(obs=obs_query, var=var_query) - self._joinids = _JoinIDCache(self) - self._indexer = AxisIndexer( - self, - index_factory=index_factory, - ) - self._index_factory = index_factory - self._threadpool_: Optional[futures.ThreadPoolExecutor] = None - def obs( self, *, column_names: Optional[Sequence[str]] = None, - batch_size: options.BatchSize = options.BatchSize(), - partitions: Optional[options.ReadPartitions] = None, - result_order: options.ResultOrderStr = _RO_AUTO, - platform_config: Optional[options.PlatformConfig] = None, - ) -> data.ReadIter[pa.Table]: + batch_size: BatchSize = BatchSize(), + partitions: Optional[ReadPartitions] = None, + result_order: ResultOrderStr = _RO_AUTO, + platform_config: Optional[PlatformConfig] = None, + ) -> ReadIter[pa.Table]: """Returns ``obs`` as an `Arrow table `_ iterator. Lifecycle: maturing """ - obs_query = self._matrix_axis_query.obs - return self._obs_df.read( - obs_query.coords, - value_filter=obs_query.value_filter, - column_names=column_names, - batch_size=batch_size, - partitions=partitions, - result_order=result_order, - platform_config=platform_config, - ) + raise NotImplementedError def var( self, *, column_names: Optional[Sequence[str]] = None, - batch_size: options.BatchSize = options.BatchSize(), - partitions: Optional[options.ReadPartitions] = None, - result_order: options.ResultOrderStr = _RO_AUTO, - platform_config: Optional[options.PlatformConfig] = None, - ) -> data.ReadIter[pa.Table]: + batch_size: BatchSize = BatchSize(), + partitions: Optional[ReadPartitions] = None, + result_order: ResultOrderStr = _RO_AUTO, + platform_config: Optional[PlatformConfig] = None, + ) -> ReadIter[pa.Table]: """Returns ``var`` as an `Arrow table `_ iterator. Lifecycle: maturing """ - var_query = self._matrix_axis_query.var - return self._var_df.read( - var_query.coords, - value_filter=var_query.value_filter, - column_names=column_names, - batch_size=batch_size, - partitions=partitions, - result_order=result_order, - platform_config=platform_config, - ) + raise NotImplementedError def obs_joinids(self) -> pa.IntegerArray: """Returns ``obs`` ``soma_joinids`` as an Arrow array. Lifecycle: maturing """ - return self._joinids.obs + raise NotImplementedError def var_joinids(self) -> pa.IntegerArray: """Returns ``var`` ``soma_joinids`` as an Arrow array. Lifecycle: maturing """ - return self._joinids.var + raise NotImplementedError @property def n_obs(self) -> int: @@ -186,7 +107,7 @@ def n_obs(self) -> int: Lifecycle: maturing """ - return len(self.obs_joinids()) + raise NotImplementedError @property def n_vars(self) -> int: @@ -194,7 +115,7 @@ def n_vars(self) -> int: Lifecycle: maturing """ - return len(self.var_joinids()) + raise NotImplementedError @property def indexer(self) -> "AxisIndexer": @@ -202,71 +123,59 @@ def indexer(self) -> "AxisIndexer": Lifecycle: maturing """ - return self._indexer + raise NotImplementedError def X( self, layer_name: str, *, - batch_size: options.BatchSize = options.BatchSize(), - partitions: Optional[options.ReadPartitions] = None, - result_order: options.ResultOrderStr = _RO_AUTO, - platform_config: Optional[options.PlatformConfig] = None, - ) -> data.SparseRead: + batch_size: BatchSize = BatchSize(), + partitions: Optional[ReadPartitions] = None, + result_order: ResultOrderStr = _RO_AUTO, + platform_config: Optional[PlatformConfig] = None, + ) -> SparseRead: """Returns an ``X`` layer as a sparse read. Args: layer_name: The X layer name to return. batch_size: The size of batches that should be returned from a read. - See :class:`options.BatchSize` for details. + See :class:`BatchSize` for details. partitions: Specifies that this is part of a partitioned read, and which partition to include, if present. result_order: the order to return results, specified as a - :class:`~options.ResultOrder` or its string value. + :class:`~ResultOrder` or its string value. + platform_config: platform-specific configuration; keys are SOMA + implementation names. Lifecycle: maturing """ - try: - x_layer = self._ms.X[layer_name] - except KeyError as ke: - raise KeyError(f"{layer_name} is not present in X") from ke - if not isinstance(x_layer, data.SparseNDArray): - raise TypeError("X layers may only be sparse arrays") - - self._joinids.preload(self._threadpool) - return x_layer.read( - (self._joinids.obs, self._joinids.var), - batch_size=batch_size, - partitions=partitions, - result_order=result_order, - platform_config=platform_config, - ) - - def obsp(self, layer: str) -> data.SparseRead: + raise NotImplementedError + + def obsp(self, layer: str) -> SparseRead: """Returns an ``obsp`` layer as a sparse read. Lifecycle: maturing """ - return self._axisp_inner(_Axis.OBS, layer) + raise NotImplementedError - def varp(self, layer: str) -> data.SparseRead: + def varp(self, layer: str) -> SparseRead: """Returns a ``varp`` layer as a sparse read. Lifecycle: maturing """ - return self._axisp_inner(_Axis.VAR, layer) + raise NotImplementedError - def obsm(self, layer: str) -> data.SparseRead: + def obsm(self, layer: str) -> SparseRead: """Returns an ``obsm`` layer as a sparse read. Lifecycle: maturing """ - return self._axism_inner(_Axis.OBS, layer) + raise NotImplementedError - def varm(self, layer: str) -> data.SparseRead: + def varm(self, layer: str) -> SparseRead: """Returns a ``varm`` layer as a sparse read. Lifecycle: maturing """ - return self._axism_inner(_Axis.VAR, layer) + raise NotImplementedError def obs_scene_ids(self) -> pa.Array: """Returns a pyarrow array with scene ids that contain obs from this @@ -274,20 +183,7 @@ def obs_scene_ids(self) -> pa.Array: Lifecycle: experimental """ - try: - obs_scene = self.experiment.obs_spatial_presence - except KeyError as ke: - raise KeyError("Missing obs_scene") from ke - if not isinstance(obs_scene, data.DataFrame): - raise TypeError("obs_scene must be a dataframe.") - - full_table = obs_scene.read( - coords=((_Axis.OBS.getattr_from(self._joinids), slice(None))), - result_order=options.ResultOrder.COLUMN_MAJOR, - value_filter="data != 0", - ).concat() - - return pacomp.unique(full_table["scene_id"]) + raise NotImplementedError def var_scene_ids(self) -> pa.Array: """Return a pyarrow array with scene ids that contain var from this @@ -295,20 +191,7 @@ def var_scene_ids(self) -> pa.Array: Lifecycle: experimental """ - try: - var_scene = self._ms.var_spatial_presence - except KeyError as ke: - raise KeyError("Missing var_scene") from ke - if not isinstance(var_scene, data.DataFrame): - raise TypeError("var_scene must be a dataframe.") - - full_table = var_scene.read( - coords=((_Axis.OBS.getattr_from(self._joinids), slice(None))), - result_order=options.ResultOrder.COLUMN_MAJOR, - value_filter="data != 0", - ).concat() - - return pacomp.unique(full_table["scene_id"]) + raise NotImplementedError def to_anndata( self, @@ -321,7 +204,7 @@ def to_anndata( varm_layers: Sequence[str] = (), varp_layers: Sequence[str] = (), drop_levels: bool = False, - ) -> anndata.AnnData: + ) -> AnnData: """ Executes the query and return result as an ``AnnData`` in-memory object. @@ -347,26 +230,7 @@ def to_anndata( Lifecycle: maturing """ - ad = self._read( - X_name, - column_names=column_names or AxisColumnNames(obs=None, var=None), - X_layers=X_layers, - obsm_layers=obsm_layers, - obsp_layers=obsp_layers, - varm_layers=varm_layers, - varp_layers=varp_layers, - ).to_anndata() - - # Drop unused categories on axis dataframes if requested - if drop_levels: - for name in ad.obs: - if ad.obs[name].dtype.name == "category": - ad.obs[name] = ad.obs[name].cat.remove_unused_categories() - for name in ad.var: - if ad.var[name].dtype.name == "category": - ad.var[name] = ad.var[name].cat.remove_unused_categories() - - return ad + raise NotImplementedError # Context management @@ -377,436 +241,19 @@ def close(self) -> None: Lifecycle: maturing """ - # Because this may be called during ``__del__`` when we might be getting - # disassembled, sometimes ``_threadpool_`` is simply missing. - # Only try to shut it down if it still exists. - pool = getattr(self, "_threadpool_", None) - if pool is None: - return - pool.shutdown() - self._threadpool_ = None + raise NotImplementedError def __enter__(self) -> Self: - return self + raise NotImplementedError def __exit__(self, *_: Any) -> None: - self.close() - - def __del__(self) -> None: - """Ensure that we're closed when our last ref disappears.""" - self.close() - # If any superclass in our MRO has a __del__, call it. - sdel = getattr(super(), "__del__", lambda: None) - sdel() - - # Internals - - def _read( - self, - X_name: str, - *, - column_names: AxisColumnNames, - X_layers: Sequence[str], - obsm_layers: Sequence[str] = (), - obsp_layers: Sequence[str] = (), - varm_layers: Sequence[str] = (), - varp_layers: Sequence[str] = (), - ) -> "_AxisQueryResult": - """Reads the entire query result in memory. - - This is a low-level routine intended to be used by loaders for other - in-core formats, such as AnnData, which can be created from the - resulting objects. - - Args: - X_name: The X layer to read and return in the ``X`` slot. - column_names: The columns in the ``var`` and ``obs`` dataframes - to read. - X_layers: Additional X layers to read and return - in the ``layers`` slot. - obsm_layers: - Additional obsm layers to read and return in the obsm slot. - obsp_layers: - Additional obsp layers to read and return in the obsp slot. - varm_layers: - Additional varm layers to read and return in the varm slot. - varp_layers: - Additional varp layers to read and return in the varp slot. - """ - x_collection = self._ms.X - all_x_names = [X_name] + list(X_layers) - all_x_arrays: Dict[str, data.SparseNDArray] = {} - for _xname in all_x_names: - if not isinstance(_xname, str) or not _xname: - raise ValueError("X layer names must be specified as a string.") - if _xname not in x_collection: - raise ValueError("Unknown X layer name") - x_array = x_collection[_xname] - if not isinstance(x_array, data.SparseNDArray): - raise NotImplementedError("Dense array unsupported") - all_x_arrays[_xname] = x_array - - def _read_axis_mappings(fn, axis, keys: Sequence[str]) -> Dict[str, np.ndarray]: - return {key: fn(axis, key) for key in keys} - - obsm_ft = self._threadpool.submit( - _read_axis_mappings, self._axism_inner_ndarray, _Axis.OBS, obsm_layers - ) - obsp_ft = self._threadpool.submit( - _read_axis_mappings, self._axisp_inner_ndarray, _Axis.OBS, obsp_layers - ) - varm_ft = self._threadpool.submit( - _read_axis_mappings, self._axism_inner_ndarray, _Axis.VAR, varm_layers - ) - varp_ft = self._threadpool.submit( - _read_axis_mappings, self._axisp_inner_ndarray, _Axis.VAR, varp_layers - ) - - obs_table, var_table = self._read_both_axes(column_names) - - x_matrices = { - _xname: _fast_csr.read_csr( - all_x_arrays[_xname], - self.obs_joinids(), - self.var_joinids(), - index_factory=self._index_factory, - ).to_scipy() - for _xname in all_x_arrays - } - - x = x_matrices.pop(X_name) - - obs = obs_table.to_pandas() - obs.index = obs.index.astype(str) - - var = var_table.to_pandas() - var.index = var.index.astype(str) - - return _AxisQueryResult( - obs=obs, - var=var, - X=x, - obsm=obsm_ft.result(), - obsp=obsp_ft.result(), - varm=varm_ft.result(), - varp=varp_ft.result(), - X_layers=x_matrices, - ) - - def _read_both_axes( - self, - column_names: AxisColumnNames, - ) -> Tuple[pa.Table, pa.Table]: - """Reads both axes in their entirety, ensuring soma_joinid is retained.""" - obs_ft = self._threadpool.submit( - self._read_axis_dataframe, - _Axis.OBS, - column_names, - ) - var_ft = self._threadpool.submit( - self._read_axis_dataframe, - _Axis.VAR, - column_names, - ) - return obs_ft.result(), var_ft.result() - - def _read_axis_dataframe( - self, - axis: "_Axis", - axis_column_names: AxisColumnNames, - ) -> pa.Table: - """Reads the specified axis. Will cache join IDs if not present.""" - column_names = axis_column_names.get(axis.value) - - axis_df = axis.getattr_from(self, pre="_", suf="_df") - assert isinstance(axis_df, data.DataFrame) - axis_query = axis.getattr_from(self._matrix_axis_query) - - # If we can cache join IDs, prepare to add them to the cache. - joinids_cached = self._joinids._is_cached(axis) - query_columns = column_names - added_soma_joinid_to_columns = False - if ( - not joinids_cached - and column_names is not None - and "soma_joinid" not in column_names - ): - # If we want to fill the join ID cache, ensure that we query the - # soma_joinid column so that it is included in the results. - # We'll filter it out later. - query_columns = ["soma_joinid"] + list(column_names) - added_soma_joinid_to_columns = True - - # Do the actual query. - arrow_table = axis_df.read( - axis_query.coords, - value_filter=axis_query.value_filter, - column_names=query_columns, - ).concat() - - # Update the cache if needed. We can do this because no matter what - # other columns are queried for, the contents of the ``soma_joinid`` - # column will be the same and can be safely stored. - if not joinids_cached: - setattr( - self._joinids, - axis.value, - arrow_table.column("soma_joinid").combine_chunks(), - ) - - # Drop soma_joinid column if we added it solely for use in filling - # the joinid cache. - if added_soma_joinid_to_columns: - arrow_table = arrow_table.drop(["soma_joinid"]) - return arrow_table - - def _axisp_inner( - self, - axis: "_Axis", - layer: str, - ) -> data.SparseRead: - p_name = f"{axis.value}p" - try: - axisp = axis.getitem_from(self._ms, suf="p") - except KeyError as ke: - raise ValueError(f"Measurement does not contain {p_name} data") from ke - - try: - ap_layer = axisp[layer] - except KeyError as ke: - raise ValueError(f"layer {layer!r} is not available in {p_name}") from ke - if not isinstance(ap_layer, data.SparseNDArray): - raise TypeError( - f"Unexpected SOMA type {type(ap_layer).__name__}" - f" stored in {p_name} layer {layer!r}" - ) - - joinids = axis.getattr_from(self._joinids) - return ap_layer.read((joinids, joinids)) - - def _axism_inner( - self, - axis: "_Axis", - layer: str, - ) -> data.SparseRead: - m_name = f"{axis.value}m" - - try: - axism = axis.getitem_from(self._ms, suf="m") - except KeyError: - raise ValueError(f"Measurement does not contain {m_name} data") from None - - try: - axism_layer = axism[layer] - except KeyError as ke: - raise ValueError(f"layer {layer!r} is not available in {m_name}") from ke - - if not isinstance(axism_layer, data.SparseNDArray): - raise TypeError(f"Unexpected SOMA type stored in '{m_name}' layer") - - joinids = axis.getattr_from(self._joinids) - return axism_layer.read((joinids, slice(None))) - - def _convert_to_ndarray( - self, axis: "_Axis", table: pa.Table, n_row: int, n_col: int - ) -> np.ndarray: - indexer = cast( - Callable[[_Numpyable], npt.NDArray[np.intp]], - axis.getattr_from(self.indexer, pre="by_"), - ) - idx = indexer(table["soma_dim_0"]) - z: np.ndarray = np.zeros(n_row * n_col, dtype=np.float32) - np.put(z, idx * n_col + table["soma_dim_1"], table["soma_data"]) - return z.reshape(n_row, n_col) - - def _axisp_inner_ndarray( - self, - axis: "_Axis", - layer: str, - ) -> np.ndarray: - n_row = n_col = len(axis.getattr_from(self._joinids)) - - table = self._axisp_inner(axis, layer).tables().concat() - return self._convert_to_ndarray(axis, table, n_row, n_col) - - def _axism_inner_ndarray( - self, - axis: "_Axis", - layer: str, - ) -> np.ndarray: - table = self._axism_inner(axis, layer).tables().concat() - - n_row = len(axis.getattr_from(self._joinids)) - n_col = len(table["soma_dim_1"].unique()) + raise NotImplementedError - return self._convert_to_ndarray(axis, table, n_row, n_col) - @property - def _obs_df(self) -> data.DataFrame: - return self.experiment.obs - - @property - def _ms(self) -> measurement.Measurement: - return self.experiment.ms[self.measurement_name] - - @property - def _var_df(self) -> data.DataFrame: - return self._ms.var - - @property - def _threadpool(self) -> futures.ThreadPoolExecutor: - """ - Returns the threadpool provided by the experiment's context. - If not available, creates a thread pool just in time.""" - context = self.experiment.context - if context and context.threadpool: - return context.threadpool - - if self._threadpool_ is None: - self._threadpool_ = futures.ThreadPoolExecutor() - return self._threadpool_ - - -# Private internal data structures - - -@attrs.define(frozen=True) -class _AxisQueryResult: - """The result of running :meth:`ExperimentAxisQuery.read`. Private.""" - - obs: pd.DataFrame - """Experiment.obs query slice, as a pandas DataFrame""" - var: pd.DataFrame - """Experiment.ms[...].var query slice, as a pandas DataFrame""" - X: sparse.csr_matrix - """Experiment.ms[...].X[...] query slice, as an SciPy sparse.csr_matrix """ - X_layers: Dict[str, sparse.csr_matrix] = attrs.field(factory=dict) - """Any additional X layers requested, as SciPy sparse.csr_matrix(s)""" - obsm: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.obsm query slice, as a numpy ndarray""" - obsp: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.obsp query slice, as a numpy ndarray""" - varm: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.varm query slice, as a numpy ndarray""" - varp: Dict[str, np.ndarray] = attrs.field(factory=dict) - """Experiment.varp query slice, as a numpy ndarray""" - - def to_anndata(self) -> anndata.AnnData: - return anndata.AnnData( - X=self.X, - obs=self.obs, - var=self.var, - obsm=(self.obsm or None), - obsp=(self.obsp or None), - varm=(self.varm or None), - varp=(self.varp or None), - layers=(self.X_layers or None), - ) - - -class _Axis(enum.Enum): - OBS = "obs" - VAR = "var" - - @property - def value(self) -> Literal["obs", "var"]: - return super().value - - @overload - def getattr_from(self, __source: "_HasObsVar[_T]") -> "_T": ... - - @overload - def getattr_from( - self, __source: Any, *, pre: Literal[""], suf: Literal[""] - ) -> object: ... - - @overload - def getattr_from( - self, __source: Any, *, pre: str = ..., suf: str = ... - ) -> object: ... - - def getattr_from(self, __source: Any, *, pre: str = "", suf: str = "") -> object: - """Equivalent to ``something.
``."""
-        return getattr(__source, pre + self.value + suf)
-
-    def getitem_from(
-        self, __source: Mapping[str, "_T"], *, pre: str = "", suf: str = ""
-    ) -> "_T":
-        """Equivalent to ``something[pre + "obs"/"var" + suf]``."""
-        return __source[pre + self.value + suf]
-
-
-@attrs.define(frozen=True)
-class _MatrixAxisQuery:
-    """The per-axis user query definition. Private."""
-
-    obs: axis.AxisQuery
-    var: axis.AxisQuery
-
-
-@attrs.define
-class _JoinIDCache:
-    """A cache for per-axis join ids in the query. Private."""
-
-    owner: ExperimentAxisQuery
-
-    _cached_obs: Optional[pa.IntegerArray] = None
-    _cached_var: Optional[pa.IntegerArray] = None
-
-    def _is_cached(self, axis: _Axis) -> bool:
-        field = "_cached_" + axis.value
-        return getattr(self, field) is not None
-
-    def preload(self, pool: futures.ThreadPoolExecutor) -> None:
-        if self._cached_obs is not None and self._cached_var is not None:
-            return
-        obs_ft = pool.submit(lambda: self.obs)
-        var_ft = pool.submit(lambda: self.var)
-        # Wait for them and raise in case of error.
-        obs_ft.result()
-        var_ft.result()
-
-    @property
-    def obs(self) -> pa.IntegerArray:
-        """Join IDs for the obs axis. Will load and cache if not already."""
-        if not self._cached_obs:
-            self._cached_obs = _load_joinids(
-                self.owner._obs_df, self.owner._matrix_axis_query.obs
-            )
-        return self._cached_obs
-
-    @obs.setter
-    def obs(self, val: pa.IntegerArray) -> None:
-        self._cached_obs = val
-
-    @property
-    def var(self) -> pa.IntegerArray:
-        """Join IDs for the var axis. Will load and cache if not already."""
-        if not self._cached_var:
-            self._cached_var = _load_joinids(
-                self.owner._var_df, self.owner._matrix_axis_query.var
-            )
-        return self._cached_var
-
-    @var.setter
-    def var(self, val: pa.IntegerArray) -> None:
-        self._cached_var = val
-
-
-def _load_joinids(df: data.DataFrame, axq: axis.AxisQuery) -> pa.IntegerArray:
-    tbl = df.read(
-        axq.coords,
-        value_filter=axq.value_filter,
-        column_names=["soma_joinid"],
-    ).concat()
-    return tbl.column("soma_joinid").combine_chunks()
-
-
-_Numpyable = Union[pa.Array, pa.ChunkedArray, npt.NDArray[np.int64]]
+Numpyable = Union[pa.Array, pa.ChunkedArray, npt.NDArray[np.int64]]
 """Things that can be converted to a NumPy array."""
 
 
-@attrs.define
 class AxisIndexer:
     """
     Given a query, provides index-building services for obs/var axis.
@@ -814,72 +261,26 @@ class AxisIndexer:
     Lifecycle: maturing
     """
 
-    query: ExperimentAxisQuery
-    _index_factory: types.IndexFactory
-    _cached_obs: Optional[types.IndexLike] = None
-    _cached_var: Optional[types.IndexLike] = None
-
-    @property
-    def _obs_index(self) -> types.IndexLike:
-        """Private. Return an index for the ``obs`` axis."""
-        if self._cached_obs is None:
-            self._cached_obs = self._index_factory(self.query.obs_joinids().to_numpy())
-        return self._cached_obs
-
-    @property
-    def _var_index(self) -> types.IndexLike:
-        """Private. Return an index for the ``var`` axis."""
-        if self._cached_var is None:
-            self._cached_var = self._index_factory(self.query.var_joinids().to_numpy())
-        return self._cached_var
-
-    def by_obs(self, coords: _Numpyable) -> npt.NDArray[np.intp]:
+    def by_obs(self, coords: Numpyable) -> npt.NDArray[np.intp]:
         """Reindex the coords (soma_joinids) over the ``obs`` axis."""
-        return self._obs_index.get_indexer(_to_numpy(coords))
+        raise NotImplementedError
 
-    def by_var(self, coords: _Numpyable) -> npt.NDArray[np.intp]:
+    def by_var(self, coords: Numpyable) -> npt.NDArray[np.intp]:
         """Reindex for the coords (soma_joinids) over the ``var`` axis."""
-        return self._var_index.get_indexer(_to_numpy(coords))
-
+        raise NotImplementedError
 
-def _to_numpy(it: _Numpyable) -> np.ndarray:
-    if isinstance(it, np.ndarray):
-        return it
-    return it.to_numpy()
 
-
-#
-# Type shenanigans
-#
-
-_T = TypeVar("_T")
-_T_co = TypeVar("_T_co", covariant=True)
-
-
-class _Experimentish(Protocol):
+class Experimentish(Protocol):
     """The API we need from an Experiment."""
 
     @property
     def ms(self) -> Mapping[str, measurement.Measurement]: ...
 
     @property
-    def obs(self) -> data.DataFrame: ...
+    def obs(self) -> DataFrame: ...
 
     @property
     def context(self) -> Optional[base_types.ContextBase]: ...
 
     @property
-    def obs_spatial_presence(self) -> data.DataFrame: ...
-
-
-class _HasObsVar(Protocol[_T_co]):
-    """Something which has an ``obs`` and ``var`` field.
-
-    Used to give nicer type inference in :meth:`_Axis.getattr_from`.
-    """
-
-    @property
-    def obs(self) -> _T_co: ...
-
-    @property
-    def var(self) -> _T_co: ...
+    def obs_spatial_presence(self) -> DataFrame: ...
diff --git a/python-spec/src/somacore/query/types.py b/python-spec/src/somacore/query/types.py
index f5caa439..e8de994b 100644
--- a/python-spec/src/somacore/query/types.py
+++ b/python-spec/src/somacore/query/types.py
@@ -1,13 +1,13 @@
 """Common types used across SOMA query modules."""
 
-from typing import Any, Callable, Union
+from typing import Callable, Union
 
 import numpy as np
 import numpy.typing as npt
 import pyarrow as pa
 from typing_extensions import Protocol
 
-_IntegerArray = Union[npt.NDArray[np.int64], pa.IntegerArray]
+IntegerArray = Union[npt.NDArray[np.int64], pa.IntegerArray]
 
 
 class IndexLike(Protocol):
@@ -19,12 +19,12 @@ class IndexLike(Protocol):
     not as a full specification of the types and behavior of ``get_indexer``.
     """
 
-    def get_indexer(self, target: _IntegerArray) -> Any:
+    def get_indexer(self, target: IntegerArray) -> npt.NDArray[np.intp]:
         """Something compatible with Pandas' Index.get_indexer method."""
 
 
-IndexFactory = Callable[[_IntegerArray], "IndexLike"]
-"""Function that builds an index over the given NDArray.
+IndexFactory = Callable[[IntegerArray], IndexLike]
+"""Function that builds an index over the given ``IntegerArray``.
 
 This interface is implemented by the callable ``pandas.Index``.
 """
diff --git a/python-spec/testing/test_query_axis.py b/python-spec/testing/test_query_axis.py
index e9d418f9..a235698d 100644
--- a/python-spec/testing/test_query_axis.py
+++ b/python-spec/testing/test_query_axis.py
@@ -1,13 +1,11 @@
 from typing import Any, Tuple
 
-import attrs
 import numpy as np
 import pytest
 from pytest import mark
 
 import somacore
 from somacore import options
-from somacore.query import query
 
 
 @mark.parametrize(
@@ -51,24 +49,3 @@ def test_canonicalization_nparray() -> None:
 def test_canonicalization_bad(coords) -> None:
     with pytest.raises(TypeError):
         somacore.AxisQuery(coords=coords)
-
-
-@attrs.define(frozen=True)
-class IHaveObsVarStuff:
-    obs: int
-    var: int
-    the_obs_suf: str
-    the_var_suf: str
-
-
-def test_axis_helpers() -> None:
-    thing = IHaveObsVarStuff(obs=1, var=2, the_obs_suf="observe", the_var_suf="vary")
-    assert 1 == query._Axis.OBS.getattr_from(thing)
-    assert 2 == query._Axis.VAR.getattr_from(thing)
-    assert "observe" == query._Axis.OBS.getattr_from(thing, pre="the_", suf="_suf")
-    assert "vary" == query._Axis.VAR.getattr_from(thing, pre="the_", suf="_suf")
-    ovdict = {"obs": "erve", "var": "y", "i_obscure": "hide", "i_varcure": "???"}
-    assert "erve" == query._Axis.OBS.getitem_from(ovdict)
-    assert "y" == query._Axis.VAR.getitem_from(ovdict)
-    assert "hide" == query._Axis.OBS.getitem_from(ovdict, pre="i_", suf="cure")
-    assert "???" == query._Axis.VAR.getitem_from(ovdict, pre="i_", suf="cure")