From 0da7f21fc19fbaf02511e8dec3a6411eb0a6946f Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 14:38:16 -0500 Subject: [PATCH] [c++, python] `fastercsx`: improved duplicate coordinate handling (#3468) (#3485) * improved duplicate coordinate handling * invert sense of dup flag * fix mishandled empty first fragment in compress_coo * pr fb * fix race Co-authored-by: Bruce Martin --- apis/python/src/tiledbsoma/_fastercsx.py | 55 ++++++++++++++---------- apis/python/src/tiledbsoma/fastercsx.cc | 13 ++++-- apis/python/tests/test_fastercsx.py | 39 +++++++++-------- apis/python/tests/test_libfastercsx.py | 34 +++++++++++++-- libtiledbsoma/src/utils/fastercsx.h | 33 ++++++++------ 5 files changed, 115 insertions(+), 59 deletions(-) diff --git a/apis/python/src/tiledbsoma/_fastercsx.py b/apis/python/src/tiledbsoma/_fastercsx.py index bc593e9004..32f08b12ee 100644 --- a/apis/python/src/tiledbsoma/_fastercsx.py +++ b/apis/python/src/tiledbsoma/_fastercsx.py @@ -29,7 +29,16 @@ class CompressedMatrix: Export constructed matrix to SciPy (or comparable package) for use of the result. """ - __slots__ = ("indptr", "indices", "data", "shape", "format", "sorted", "context") + __slots__ = ( + "indptr", + "indices", + "data", + "shape", + "format", + "is_sorted", + "no_duplicates", + "context", + ) def __init__( self, @@ -38,7 +47,8 @@ def __init__( data: NDArrayNumber, shape: Tuple[int, int], format: Literal["csc", "csr"], - sorted: bool, + is_sorted: bool, + no_duplicates: bool | None, context: SOMATileDBContext, ) -> None: """Construct from PJV format. Not intended for direct use - use instead the @@ -54,7 +64,8 @@ def __init__( self.indices = indices self.data = data self.format = format - self.sorted = sorted + self.is_sorted = is_sorted + self.no_duplicates = no_duplicates self.context = context @staticmethod @@ -86,25 +97,15 @@ def from_ijd( compress_coo( context.native_context, (n_major, n_minor), i, j, d, indptr, indices, data ) + no_duplicates = None # aka, unknown if make_sorted: - sort_csx_indices(context.native_context, indptr, indices, data) + no_duplicates = sort_csx_indices( + context.native_context, indptr, indices, data + ) return CompressedMatrix( - indptr, indices, data, shape, format, make_sorted, context + indptr, indices, data, shape, format, make_sorted, no_duplicates, context ) - @staticmethod - def from_pjd( - p: NDArrayIndex, - j: NDArrayIndex, - d: NDArrayNumber, - shape: Tuple[int, int], - format: Literal["csc", "csr"], - make_sorted: bool, - context: SOMATileDBContext, - ) -> CompressedMatrix: - """Factory method accepting pointers, indices and data vectors.""" - return CompressedMatrix(p, j, d, shape, format, make_sorted, context) - @staticmethod def from_soma( tables: pa.Table | Sequence[pa.Table], @@ -156,12 +157,12 @@ def dtype(self) -> npt.DTypeLike: def to_scipy( self, index: slice | None = None ) -> scipy.sparse.csr_matrix | scipy.sparse.csc_matrix: - """Extract a slice on the compressed dimension and return as a dense + """Extract a slice on the compressed dimension and return as a :class:`scipy.sparse.csr_matrix` or :class:`scipy.sparse.csc_matrix`. Optionally allows slicing on compressed dimension during conversion, in which case - an extra memory copy is avoided. + an extra memory copy is avoided for the SOMA fast path (no-dups). """ index = index or slice(None) assert isinstance(index, slice) @@ -194,7 +195,13 @@ def to_scipy( else (self.shape[0], n_major) ) return CompressedMatrix._to_scipy( - indptr, indices, data, shape, self.format, self.sorted + indptr, + indices, + data, + shape, + self.format, + self.is_sorted, + self.no_duplicates, ) def to_numpy(self, index: slice | None = None) -> NDArrayNumber: @@ -246,7 +253,8 @@ def _to_scipy( data: NDArrayNumber, shape: Tuple[int, int], format: Literal["csc", "csr"], - sorted: bool, + is_sorted: bool, + no_duplicates: bool | None, ) -> scipy.sparse.csr_matrix | scipy.sparse.csc_matrix: """ This is to bypass the O(N) scan that :meth:`sparse._cs_matrix.__init__` @@ -258,7 +266,8 @@ def _to_scipy( sparse.csr_matrix((data, indices, indptr), shape=shape) (or the equivalent for csc_matrix) """ - if sorted: + if is_sorted and bool(no_duplicates): + # Fast path for SOMA common case. matrix = ( scipy.sparse.csr_matrix.__new__(scipy.sparse.csr_matrix) if format == "csr" diff --git a/apis/python/src/tiledbsoma/fastercsx.cc b/apis/python/src/tiledbsoma/fastercsx.cc index 5d96e8ab75..e1de4f901b 100644 --- a/apis/python/src/tiledbsoma/fastercsx.cc +++ b/apis/python/src/tiledbsoma/fastercsx.cc @@ -390,9 +390,12 @@ void compress_coo( /** * @brief Python binding for sort_csx_indices, which sorts minor dimension of a - * compressed matrix + * compressed matrix. + * + * Returns false if the matrix contains duplicate coordinates, true if all + * coordinates are unique. */ -void sort_csx_indices( +bool sort_csx_indices( std::shared_ptr ctx, py::array Bp, py::array Bj, @@ -413,10 +416,10 @@ void sort_csx_indices( value_type_dispatch, Bd.dtype(), "CSx data array"); // Dispatch by type - std::visit( + auto no_duplicates = std::visit( [&](auto value_type, auto csx_major_index_type, - auto csx_minor_index_type) { + auto csx_minor_index_type) -> bool { using CSX_MAJOR_INDEX = typename decltype(csx_major_index_type)::type; using CSX_MINOR_INDEX = @@ -445,6 +448,8 @@ void sort_csx_indices( value_type, csx_major_index_type, csx_minor_index_type); + + return no_duplicates; }; /** diff --git a/apis/python/tests/test_fastercsx.py b/apis/python/tests/test_fastercsx.py index 8d44f055d4..6dec5145da 100644 --- a/apis/python/tests/test_fastercsx.py +++ b/apis/python/tests/test_fastercsx.py @@ -56,7 +56,7 @@ def assert_eq(sp: sparse.spmatrix, cm: fastercsx.CompressedMatrix) -> bool: assert cm.indptr.shape == sp_csx.indptr.shape assert np.array_equal(cm.indptr, sp_csx.indptr) - if cm.sorted: + if cm.is_sorted: assert np.array_equal(cm.indices, sp_csx.indices) assert np.array_equal(cm.data, sp_csx.data) @@ -93,22 +93,6 @@ def test_from_ijd( assert cm.to_scipy().has_canonical_format -def test_from_pjd(context: soma.SOMATileDBContext, rng: np.random.Generator) -> None: - sp = sparse.random( - 9970, 3124, density=0.01, dtype=np.float32, random_state=rng - ).tocsr() - cm = fastercsx.CompressedMatrix.from_pjd( - sp.indptr, - sp.indices, - sp.data, - sp.shape, - format="csr", - make_sorted=True, - context=context, - ) - assert_eq(sp, cm) - - @pytest.mark.parametrize("format", ["csr", "csc"]) @pytest.mark.parametrize("n_tables", [1, 2, 3]) @pytest.mark.parametrize( @@ -361,3 +345,24 @@ def test_bad_shapes(context: soma.SOMATileDBContext, rng: np.random.Generator) - fastercsx.CompressedMatrix.from_ijd( sp.row, sp.col, sp.data, shp, "csr", True, context ) + + +@pytest.mark.parametrize("format", ["csr", "csc"]) +@pytest.mark.parametrize("make_sorted", [True, False]) +def test_duplicates( + format: str, + make_sorted: bool, + context: soma.SOMATileDBContext, + rng: np.random.Generator, +) -> None: + shape = (10, 10) + i = np.array([0, 0, 1, 1], dtype=np.int64) + j = np.array([0, 0, 1, 1], dtype=np.int64) + d = np.arange(len(i), dtype=np.int8) + + cm = fastercsx.CompressedMatrix.from_ijd( + i, j, d, shape, format, make_sorted, context + ) + assert ( + sparse.coo_matrix((d, (i, j)), shape=shape).asformat(format) != cm.to_scipy() + ).nnz == 0 diff --git a/apis/python/tests/test_libfastercsx.py b/apis/python/tests/test_libfastercsx.py index 8a954d9470..6e949a9916 100644 --- a/apis/python/tests/test_libfastercsx.py +++ b/apis/python/tests/test_libfastercsx.py @@ -85,7 +85,7 @@ def test_construction( indices, data, ) - fastercsx.sort_csx_indices(context, indptr, indices, data) + assert fastercsx.sort_csx_indices(context, indptr, indices, data) # Verify equality with SciPy constructed CSR csr = sp.tocsr() @@ -144,7 +144,7 @@ def test_partitioning( indices, data, ) - fastercsx.sort_csx_indices(context, indptr, indices, data) + assert fastercsx.sort_csx_indices(context, indptr, indices, data) # Verify equality with SciPy constructed CSR csr = sp.tocsr() @@ -194,7 +194,7 @@ def test_multichunk( indices, data, ) - fastercsx.sort_csx_indices(context, indptr, indices, data) + assert fastercsx.sort_csx_indices(context, indptr, indices, data) # Verify equality with SciPy constructed CSR csr = sp.tocsr() @@ -334,3 +334,31 @@ def test_ragged_chunk_error( indices, data, ) + + +def test_empty_first_frag(rng: np.random.Generator, context: clib.SOMAContext) -> None: + fastercsx.compress_coo( + context, + shape := (56, 18), + ( + np.array([], dtype=np.int32), + i := np.array([9, 35, 20, 7, 46, 41, 16, 28, 8, 19], dtype=np.int32), + ), + ( + np.array([], dtype=np.int32), + j := np.array([15, 14, 11, 9, 4, 0, 0, 5, 3, 1], dtype=np.int32), + ), + ( + np.array([], dtype=np.int8), + d := np.array([85, 63, 51, 26, 30, 4, 7, 1, 17, 81], dtype=np.int8), + ), + indptr := np.zeros(57, dtype=np.int32), + indices := np.empty((10,), dtype=np.int32), + data := np.empty((10,), dtype=np.int8), + ) + + csr = sparse.csr_matrix((d, (i, j)), shape=shape) + assert (csr != sparse.csr_matrix((data, indices, indptr), shape=shape)).nnz == 0 + assert np.array_equal(csr.indptr, indptr) + assert np.array_equal(csr.indices, indices) + assert np.array_equal(csr.data, data) diff --git a/libtiledbsoma/src/utils/fastercsx.h b/libtiledbsoma/src/utils/fastercsx.h index 6e4f6bc60a..29a5b36226 100644 --- a/libtiledbsoma/src/utils/fastercsx.h +++ b/libtiledbsoma/src/utils/fastercsx.h @@ -30,6 +30,7 @@ * Fast construction of CSX from COO. */ +#include #include #include #include @@ -215,16 +216,15 @@ void count_rows_( } else if (n_partitions > 0) { // for a single partition, just accumulate directly into the output // array - assert(Ai.size() == 1); - assert( - Ai[0].data() == partitions[0].views[0].data() && - Ai[0].size() == partitions[0].views[0].size()); - auto& Ai_view = Ai[0]; - for (uint64_t n = 0; n < nnz; ++n) { - uint64_t row = Ai_view[n]; - if ((row > n_row - 1) || (row < 0)) [[unlikely]] - throw std::out_of_range("Coordinate out of range."); - Bp[row]++; + for (auto& Ai_view : partitions[0].views) { + for (size_t n = 0; n < Ai_view.size(); n++) { + auto row = Ai_view[n]; + if ((row < 0) || + (static_cast>(row) >= n_row)) + [[unlikely]] + throw std::out_of_range("Coordinate out of range."); + Bp[row]++; + } } } // else, zero length array @@ -435,9 +435,12 @@ bool index_lt_( /** * @brief In-place sort of minor axis, used to canonicalize the CSx ordering. + * + * Returns false if there are duplicate coordinates, true if all coordinates + * are unique. */ template -void sort_csx_indices( +bool sort_csx_indices( ThreadPool* const tp, uint64_t n_row, uint64_t nnz, @@ -448,8 +451,10 @@ void sort_csx_indices( assert(Bj.size() == nnz); assert(Bd.size() == nnz); + std::atomic no_duplicates(true); + auto status = parallel_for( - tp, 0ul, n_row, [&Bp, &Bj, &Bd, &nnz](uint64_t row) { + tp, 0ul, n_row, [&Bp, &Bj, &Bd, &nnz, &no_duplicates](uint64_t row) { uint64_t idx_start = Bp[row]; uint64_t idx_end = Bp[row + 1]; @@ -468,12 +473,16 @@ void sort_csx_indices( for (uint64_t n = 0, idx = idx_start; idx < idx_end; ++n, ++idx) { Bj[idx] = temp[n].first; Bd[idx] = temp[n].second; + + if (n > 0 && Bj[idx] == Bj[idx - 1]) + no_duplicates = false; } return Status::Ok(); }); assert(status.ok()); + return no_duplicates; }; /**