Skip to content

Commit

Permalink
[c++, python] fastercsx: improved duplicate coordinate handling (#3468
Browse files Browse the repository at this point in the history
) (#3485)

* improved duplicate coordinate handling

* invert sense of dup flag

* fix mishandled empty first fragment in compress_coo

* pr fb

* fix race

Co-authored-by: Bruce Martin <[email protected]>
  • Loading branch information
github-actions[bot] and bkmartinjr authored Dec 19, 2024
1 parent 89b0e7f commit 0da7f21
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 59 deletions.
55 changes: 32 additions & 23 deletions apis/python/src/tiledbsoma/_fastercsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,16 @@ class CompressedMatrix:
Export constructed matrix to SciPy (or comparable package) for use of the result.
"""

__slots__ = ("indptr", "indices", "data", "shape", "format", "sorted", "context")
__slots__ = (
"indptr",
"indices",
"data",
"shape",
"format",
"is_sorted",
"no_duplicates",
"context",
)

def __init__(
self,
Expand All @@ -38,7 +47,8 @@ def __init__(
data: NDArrayNumber,
shape: Tuple[int, int],
format: Literal["csc", "csr"],
sorted: bool,
is_sorted: bool,
no_duplicates: bool | None,
context: SOMATileDBContext,
) -> None:
"""Construct from PJV format. Not intended for direct use - use instead the
Expand All @@ -54,7 +64,8 @@ def __init__(
self.indices = indices
self.data = data
self.format = format
self.sorted = sorted
self.is_sorted = is_sorted
self.no_duplicates = no_duplicates
self.context = context

@staticmethod
Expand Down Expand Up @@ -86,25 +97,15 @@ def from_ijd(
compress_coo(
context.native_context, (n_major, n_minor), i, j, d, indptr, indices, data
)
no_duplicates = None # aka, unknown
if make_sorted:
sort_csx_indices(context.native_context, indptr, indices, data)
no_duplicates = sort_csx_indices(
context.native_context, indptr, indices, data
)
return CompressedMatrix(
indptr, indices, data, shape, format, make_sorted, context
indptr, indices, data, shape, format, make_sorted, no_duplicates, context
)

@staticmethod
def from_pjd(
p: NDArrayIndex,
j: NDArrayIndex,
d: NDArrayNumber,
shape: Tuple[int, int],
format: Literal["csc", "csr"],
make_sorted: bool,
context: SOMATileDBContext,
) -> CompressedMatrix:
"""Factory method accepting pointers, indices and data vectors."""
return CompressedMatrix(p, j, d, shape, format, make_sorted, context)

@staticmethod
def from_soma(
tables: pa.Table | Sequence[pa.Table],
Expand Down Expand Up @@ -156,12 +157,12 @@ def dtype(self) -> npt.DTypeLike:
def to_scipy(
self, index: slice | None = None
) -> scipy.sparse.csr_matrix | scipy.sparse.csc_matrix:
"""Extract a slice on the compressed dimension and return as a dense
"""Extract a slice on the compressed dimension and return as a
:class:`scipy.sparse.csr_matrix` or
:class:`scipy.sparse.csc_matrix`.
Optionally allows slicing on compressed dimension during conversion, in which case
an extra memory copy is avoided.
an extra memory copy is avoided for the SOMA fast path (no-dups).
"""
index = index or slice(None)
assert isinstance(index, slice)
Expand Down Expand Up @@ -194,7 +195,13 @@ def to_scipy(
else (self.shape[0], n_major)
)
return CompressedMatrix._to_scipy(
indptr, indices, data, shape, self.format, self.sorted
indptr,
indices,
data,
shape,
self.format,
self.is_sorted,
self.no_duplicates,
)

def to_numpy(self, index: slice | None = None) -> NDArrayNumber:
Expand Down Expand Up @@ -246,7 +253,8 @@ def _to_scipy(
data: NDArrayNumber,
shape: Tuple[int, int],
format: Literal["csc", "csr"],
sorted: bool,
is_sorted: bool,
no_duplicates: bool | None,
) -> scipy.sparse.csr_matrix | scipy.sparse.csc_matrix:
"""
This is to bypass the O(N) scan that :meth:`sparse._cs_matrix.__init__`
Expand All @@ -258,7 +266,8 @@ def _to_scipy(
sparse.csr_matrix((data, indices, indptr), shape=shape)
(or the equivalent for csc_matrix)
"""
if sorted:
if is_sorted and bool(no_duplicates):
# Fast path for SOMA common case.
matrix = (
scipy.sparse.csr_matrix.__new__(scipy.sparse.csr_matrix)
if format == "csr"
Expand Down
13 changes: 9 additions & 4 deletions apis/python/src/tiledbsoma/fastercsx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,12 @@ void compress_coo(

/**
* @brief Python binding for sort_csx_indices, which sorts minor dimension of a
* compressed matrix
* compressed matrix.
*
* Returns false if the matrix contains duplicate coordinates, true if all
* coordinates are unique.
*/
void sort_csx_indices(
bool sort_csx_indices(
std::shared_ptr<tiledbsoma::SOMAContext> ctx,
py::array Bp,
py::array Bj,
Expand All @@ -413,10 +416,10 @@ void sort_csx_indices(
value_type_dispatch, Bd.dtype(), "CSx data array");

// Dispatch by type
std::visit(
auto no_duplicates = std::visit(
[&](auto value_type,
auto csx_major_index_type,
auto csx_minor_index_type) {
auto csx_minor_index_type) -> bool {
using CSX_MAJOR_INDEX =
typename decltype(csx_major_index_type)::type;
using CSX_MINOR_INDEX =
Expand Down Expand Up @@ -445,6 +448,8 @@ void sort_csx_indices(
value_type,
csx_major_index_type,
csx_minor_index_type);

return no_duplicates;
};

/**
Expand Down
39 changes: 22 additions & 17 deletions apis/python/tests/test_fastercsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def assert_eq(sp: sparse.spmatrix, cm: fastercsx.CompressedMatrix) -> bool:
assert cm.indptr.shape == sp_csx.indptr.shape

assert np.array_equal(cm.indptr, sp_csx.indptr)
if cm.sorted:
if cm.is_sorted:
assert np.array_equal(cm.indices, sp_csx.indices)
assert np.array_equal(cm.data, sp_csx.data)

Expand Down Expand Up @@ -93,22 +93,6 @@ def test_from_ijd(
assert cm.to_scipy().has_canonical_format


def test_from_pjd(context: soma.SOMATileDBContext, rng: np.random.Generator) -> None:
sp = sparse.random(
9970, 3124, density=0.01, dtype=np.float32, random_state=rng
).tocsr()
cm = fastercsx.CompressedMatrix.from_pjd(
sp.indptr,
sp.indices,
sp.data,
sp.shape,
format="csr",
make_sorted=True,
context=context,
)
assert_eq(sp, cm)


@pytest.mark.parametrize("format", ["csr", "csc"])
@pytest.mark.parametrize("n_tables", [1, 2, 3])
@pytest.mark.parametrize(
Expand Down Expand Up @@ -361,3 +345,24 @@ def test_bad_shapes(context: soma.SOMATileDBContext, rng: np.random.Generator) -
fastercsx.CompressedMatrix.from_ijd(
sp.row, sp.col, sp.data, shp, "csr", True, context
)


@pytest.mark.parametrize("format", ["csr", "csc"])
@pytest.mark.parametrize("make_sorted", [True, False])
def test_duplicates(
format: str,
make_sorted: bool,
context: soma.SOMATileDBContext,
rng: np.random.Generator,
) -> None:
shape = (10, 10)
i = np.array([0, 0, 1, 1], dtype=np.int64)
j = np.array([0, 0, 1, 1], dtype=np.int64)
d = np.arange(len(i), dtype=np.int8)

cm = fastercsx.CompressedMatrix.from_ijd(
i, j, d, shape, format, make_sorted, context
)
assert (
sparse.coo_matrix((d, (i, j)), shape=shape).asformat(format) != cm.to_scipy()
).nnz == 0
34 changes: 31 additions & 3 deletions apis/python/tests/test_libfastercsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_construction(
indices,
data,
)
fastercsx.sort_csx_indices(context, indptr, indices, data)
assert fastercsx.sort_csx_indices(context, indptr, indices, data)

# Verify equality with SciPy constructed CSR
csr = sp.tocsr()
Expand Down Expand Up @@ -144,7 +144,7 @@ def test_partitioning(
indices,
data,
)
fastercsx.sort_csx_indices(context, indptr, indices, data)
assert fastercsx.sort_csx_indices(context, indptr, indices, data)

# Verify equality with SciPy constructed CSR
csr = sp.tocsr()
Expand Down Expand Up @@ -194,7 +194,7 @@ def test_multichunk(
indices,
data,
)
fastercsx.sort_csx_indices(context, indptr, indices, data)
assert fastercsx.sort_csx_indices(context, indptr, indices, data)

# Verify equality with SciPy constructed CSR
csr = sp.tocsr()
Expand Down Expand Up @@ -334,3 +334,31 @@ def test_ragged_chunk_error(
indices,
data,
)


def test_empty_first_frag(rng: np.random.Generator, context: clib.SOMAContext) -> None:
fastercsx.compress_coo(
context,
shape := (56, 18),
(
np.array([], dtype=np.int32),
i := np.array([9, 35, 20, 7, 46, 41, 16, 28, 8, 19], dtype=np.int32),
),
(
np.array([], dtype=np.int32),
j := np.array([15, 14, 11, 9, 4, 0, 0, 5, 3, 1], dtype=np.int32),
),
(
np.array([], dtype=np.int8),
d := np.array([85, 63, 51, 26, 30, 4, 7, 1, 17, 81], dtype=np.int8),
),
indptr := np.zeros(57, dtype=np.int32),
indices := np.empty((10,), dtype=np.int32),
data := np.empty((10,), dtype=np.int8),
)

csr = sparse.csr_matrix((d, (i, j)), shape=shape)
assert (csr != sparse.csr_matrix((data, indices, indptr), shape=shape)).nnz == 0
assert np.array_equal(csr.indptr, indptr)
assert np.array_equal(csr.indices, indices)
assert np.array_equal(csr.data, data)
33 changes: 21 additions & 12 deletions libtiledbsoma/src/utils/fastercsx.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* Fast construction of CSX from COO.
*/

#include <atomic>
#include <cmath>
#include <cstdint>
#include <numeric>
Expand Down Expand Up @@ -215,16 +216,15 @@ void count_rows_(
} else if (n_partitions > 0) {
// for a single partition, just accumulate directly into the output
// array
assert(Ai.size() == 1);
assert(
Ai[0].data() == partitions[0].views[0].data() &&
Ai[0].size() == partitions[0].views[0].size());
auto& Ai_view = Ai[0];
for (uint64_t n = 0; n < nnz; ++n) {
uint64_t row = Ai_view[n];
if ((row > n_row - 1) || (row < 0)) [[unlikely]]
throw std::out_of_range("Coordinate out of range.");
Bp[row]++;
for (auto& Ai_view : partitions[0].views) {
for (size_t n = 0; n < Ai_view.size(); n++) {
auto row = Ai_view[n];
if ((row < 0) ||
(static_cast<std::make_unsigned_t<COO_IDX>>(row) >= n_row))
[[unlikely]]
throw std::out_of_range("Coordinate out of range.");
Bp[row]++;
}
}
}
// else, zero length array
Expand Down Expand Up @@ -435,9 +435,12 @@ bool index_lt_(

/**
* @brief In-place sort of minor axis, used to canonicalize the CSx ordering.
*
* Returns false if there are duplicate coordinates, true if all coordinates
* are unique.
*/
template <class VALUE, class CSX_MINOR_IDX, class CSX_MAJOR_IDX>
void sort_csx_indices(
bool sort_csx_indices(
ThreadPool* const tp,
uint64_t n_row,
uint64_t nnz,
Expand All @@ -448,8 +451,10 @@ void sort_csx_indices(
assert(Bj.size() == nnz);
assert(Bd.size() == nnz);

std::atomic<bool> no_duplicates(true);

auto status = parallel_for(
tp, 0ul, n_row, [&Bp, &Bj, &Bd, &nnz](uint64_t row) {
tp, 0ul, n_row, [&Bp, &Bj, &Bd, &nnz, &no_duplicates](uint64_t row) {
uint64_t idx_start = Bp[row];
uint64_t idx_end = Bp[row + 1];

Expand All @@ -468,12 +473,16 @@ void sort_csx_indices(
for (uint64_t n = 0, idx = idx_start; idx < idx_end; ++n, ++idx) {
Bj[idx] = temp[n].first;
Bd[idx] = temp[n].second;

if (n > 0 && Bj[idx] == Bj[idx - 1])
no_duplicates = false;
}

return Status::Ok();
});

assert(status.ok());
return no_duplicates;
};

/**
Expand Down

0 comments on commit 0da7f21

Please sign in to comment.