Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow overriding of CSR accumulators with an IndexLike #181

Merged
merged 1 commit into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 54 additions & 82 deletions python-spec/src/somacore/query/_fast_csr.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,74 @@
import numba.typed
import numpy as np
import numpy.typing as npt
import pandas as pd
import pyarrow as pa
from scipy import sparse

from .. import data as scd
from . import _eager_iter
from . import types


def read_scipy_csr(
matrix: scd.SparseNDArray, obs_joinids: pa.Array, var_joinids: pa.Array
) -> sparse.csr_matrix:
"""
Given a 2D SparseNDArray and joinids for the two dimensions, read the
slice and return it as an SciPy sparse.csr_matrix.
"""
data, indptr, indices, shape = _read_csr(matrix, obs_joinids, var_joinids)
csr = _create_scipy_csr_matrix(data, indices, indptr, shape=shape)
return csr
def read_csr(
matrix: scd.SparseNDArray,
obs_joinids: pa.Array,
var_joinids: pa.Array,
index_factory: types.IndexFactory,
) -> "AccumulatedCSR":
if not isinstance(matrix, scd.SparseNDArray) or matrix.ndim != 2:
raise TypeError("Can only read from a 2D SparseNDArray")

max_workers = (os.cpu_count() or 4) + 2
with futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
acc = _CSRAccumulator(
obs_joinids=obs_joinids,
var_joinids=var_joinids,
pool=pool,
index_factory=index_factory,
)
for tbl in _eager_iter.EagerIterator(
matrix.read((obs_joinids, var_joinids)).tables(),
pool=pool,
):
acc.append(tbl["soma_dim_0"], tbl["soma_dim_1"], tbl["soma_data"])

def read_arrow_csr(
matrix: scd.SparseNDArray, obs_joinids: pa.Array, var_joinids: pa.Array
) -> pa.SparseCSRMatrix:
"""
Given a 2D SparseNDArray and joinids for the two dimensions, read the
slice and return it as a pyarrow.SparseCSRMatrix.
"""
data, indptr, indices, shape = _read_csr(matrix, obs_joinids, var_joinids)
csr = pa.SparseCSRMatrix.from_numpy(data, indptr, indices, shape=shape)
return csr
return acc.finalize()


class _CSRAccumulatorFinalResult(NamedTuple):
class AccumulatedCSR(NamedTuple):
"""
Private.

Return type for the _CSRAccumulator.finalize method.
Contains a sparse CSR consituent elements
Contains a sparse CSR's constituent elements.
"""

data: npt.NDArray[np.number]
indptr: npt.NDArray[np.integer]
indices: npt.NDArray[np.integer]
shape: Tuple[int, int]

def to_scipy(self) -> sparse.csr_matrix:
"""Create a Scipy sparse.csr_matrix from component elements.

Conceptually, this is identical to::

sparse.csr_matrix((data, indices, indptr), shape=shape)

This ugliness is to bypass the O(N) scan that
:meth:`sparse._cs_matrix.__init__`
does when a new compressed matrix is created.

See `SciPy bug 11496 <https://github.com/scipy/scipy/issues/11496>`
for details.
"""
matrix = sparse.csr_matrix.__new__(sparse.csr_matrix)
matrix.data = self.data
matrix.indptr = self.indptr
matrix.indices = self.indices
matrix._shape = self.shape
return matrix


class _CSRAccumulator:
"""
Expand All @@ -62,14 +85,15 @@ def __init__(
obs_joinids: npt.NDArray[np.int64],
var_joinids: npt.NDArray[np.int64],
pool: futures.Executor,
index_factory: types.IndexFactory,
):
self.obs_joinids = obs_joinids
self.var_joinids = var_joinids
self.pool = pool

self.shape: Tuple[int, int] = (len(self.obs_joinids), len(self.var_joinids))
self.obs_indexer: pd.Index = pd.Index(self.obs_joinids)
self.var_indexer: pd.Index = pd.Index(self.var_joinids)
self.obs_indexer = index_factory(self.obs_joinids)
self.var_indexer = index_factory(self.var_joinids)
self.row_length: npt.NDArray[np.int64] = np.zeros(
(self.shape[0],), dtype=_select_dtype(self.shape[1])
)
Expand All @@ -91,6 +115,7 @@ def append(
) -> None:
"""
At accumulation time, do several things:

* re-index to positional indices, and if possible, cast to smaller dtype
to minimize memory footprint (at cost of some amount of time)
* accumulate column counts by row, i.e., build the basis of the indptr
Expand All @@ -113,15 +138,15 @@ def append(
self.coo_chunks.append((row_ind, col_ind, data.to_numpy()))
_accum_row_length(self.row_length, row_ind)

def finalize(self) -> _CSRAccumulatorFinalResult:
def finalize(self) -> AccumulatedCSR:
nnz = sum(len(chunk[2]) for chunk in self.coo_chunks)
index_dtype = _select_dtype(nnz)
if nnz == 0:
# There is no way to infer matrix dtype, so use a default and return
# an empty matrix. Float32 is used as a default type, as it is most
# compatible with AnnData expectations.
empty = sparse.csr_matrix((0, 0), dtype=np.float32)
return _CSRAccumulatorFinalResult(
return AccumulatedCSR(
data=empty.data,
indptr=empty.indptr,
indices=empty.indices,
Expand Down Expand Up @@ -160,7 +185,7 @@ def finalize(self) -> _CSRAccumulatorFinalResult:
]
)
_finalize_indptr(indptr)
return _CSRAccumulatorFinalResult(
return AccumulatedCSR(
data=data, indptr=indptr, indices=indices, shape=self.shape
)

Expand Down Expand Up @@ -243,61 +268,8 @@ def _select_dtype(


def _reindex_and_cast(
index: pd.Index, ids: npt.NDArray[np.int64], target_dtype: npt.DTypeLike
index: types.IndexLike, ids: npt.NDArray[np.int64], target_dtype: npt.DTypeLike
) -> npt.NDArray[np.int64]:
return cast(
npt.NDArray[np.int64], index.get_indexer(ids).astype(target_dtype, copy=False)
)


def _read_csr(
matrix: scd.SparseNDArray, obs_joinids: pa.Array, var_joinids: pa.Array
) -> Tuple[
npt.NDArray[np.number], # data
npt.NDArray[np.integer], # indptr
npt.NDArray[np.integer], # indices
Tuple[int, int], # shape
]:
if not isinstance(matrix, scd.SparseNDArray) or matrix.ndim != 2:
raise TypeError("Can only read from a 2D SparseNDArray")

max_workers = (os.cpu_count() or 4) + 2
with futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
acc = _CSRAccumulator(
obs_joinids=obs_joinids, var_joinids=var_joinids, pool=pool
)
for tbl in _eager_iter.EagerIterator(
matrix.read((obs_joinids, var_joinids)).tables(),
pool=pool,
):
acc.append(tbl["soma_dim_0"], tbl["soma_dim_1"], tbl["soma_data"])

data, indptr, indices, shape = acc.finalize()

return data, indptr, indices, shape


def _create_scipy_csr_matrix(
data: npt.NDArray[np.number],
indices: npt.NDArray[np.integer],
indptr: npt.NDArray[np.integer],
shape: Tuple[int, int],
) -> sparse.csr_matrix:
"""Create a Scipy sparse.csr_matrix from component elements.

Conceptually, this is identical to::

sparse.csr_matrix((data, indices, indptr), shape=shape)

This ugliness is to bypass the O(N) scan that
:meth:`scipy.sparse._cs_matrix.__init__`
does when a new compressed matrix is created.

See https://github.com/scipy/scipy/issues/11496 for details on the bug.
"""
matrix = sparse.csr_matrix.__new__(sparse.csr_matrix)
matrix.data = data
matrix.indices = indices
matrix.indptr = indptr
matrix._shape = shape
return matrix
18 changes: 14 additions & 4 deletions python-spec/src/somacore/query/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .. import options
from . import _fast_csr
from . import axis
from . import types

_RO_AUTO = options.ResultOrder.AUTO

Expand Down Expand Up @@ -88,6 +89,7 @@ def __init__(
*,
obs_query: axis.AxisQuery = axis.AxisQuery(),
var_query: axis.AxisQuery = axis.AxisQuery(),
index_factory: types.IndexFactory = pd.Index,
):
if measurement_name not in experiment.ms:
raise ValueError("Measurement does not exist in the experiment")
Expand All @@ -97,7 +99,11 @@ def __init__(

self._matrix_axis_query = _MatrixAxisQuery(obs=obs_query, var=var_query)
self._joinids = _JoinIDCache(self)
self._indexer = AxisIndexer(self)
self._indexer = AxisIndexer(
self,
index_factory=index_factory,
)
self._index_factory = index_factory
self._threadpool_: Optional[futures.ThreadPoolExecutor] = None

def obs(
Expand Down Expand Up @@ -393,9 +399,12 @@ def _read_axis_mappings(fn, axis, keys: Sequence[str]) -> Dict[str, np.ndarray]:
obs_table, var_table = self._read_both_axes(column_names)

x_matrices = {
_xname: _fast_csr.read_scipy_csr(
all_x_arrays[_xname], self.obs_joinids(), self.var_joinids()
)
_xname: _fast_csr.read_csr(
all_x_arrays[_xname],
self.obs_joinids(),
self.var_joinids(),
index_factory=self._index_factory,
).to_scipy()
for _xname in all_x_arrays
}

Expand Down Expand Up @@ -736,6 +745,7 @@ class AxisIndexer:
"""

query: ExperimentAxisQuery
_index_factory: types.IndexFactory
_cached_obs: Optional[pd.Index] = None
_cached_var: Optional[pd.Index] = None

Expand Down
33 changes: 33 additions & 0 deletions python-spec/src/somacore/query/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Common types used across SOMA query modules."""

from typing import Any, Callable

import numpy as np
import numpy.typing as npt
from typing_extensions import Protocol


class IndexLike(Protocol):
"""The basics of what we expect an Index to be.

This is a basic description of the parts of the ``pandas.Index`` type
that we use. It is intended as a rough guide so an implementor can know
that they are probably passing the right "index" type into a function,
not as a full specification of the types and behavior of ``get_indexer``.
"""

def get_indexer(
self,
target: npt.NDArray[np.int64],
method: object = ...,
limit: object = ...,
tolerance: object = ...,
) -> Any:
"""Something compatible with Pandas' Index.get_indexer method."""


IndexFactory = Callable[[npt.NDArray[np.int64]], "IndexLike"]
"""Function that builds an index over the given NDArray.

This interface is implemented by the callable ``pandas.Index``.
"""