diff --git a/benchmark/python/sparse_end2end.py b/benchmark/python/sparse_end2end.py index 986854de552b..62a3b77b8482 100644 --- a/benchmark/python/sparse_end2end.py +++ b/benchmark/python/sparse_end2end.py @@ -93,15 +93,18 @@ def row_sparse_pull(kv, key, data, slices, weight_array, priority): # the weights to each context # column indices (NDArray type) of the csr data # used as the row_idx of the weight row-sparse matrix - # TODO(junwu): - # the following two lines block, may need to precompute - # them and cache them outside the for loop row_indices = data.indices - indptr = data.indptr.asnumpy() - row_idx_array = [] - for s in slices: - row_idx_array.append(row_indices[indptr[s.start]:indptr[s.stop]]) - kv.row_sparse_pull(key, weight_array, priority=priority, row_ids=row_idx_array) + if len(slices) == 1: + kv.row_sparse_pull(key, weight_array, priority=priority, row_ids=row_indices) + else: # more than one slices, multi-GPU training. Need to retain weight rows according to data slices + # TODO(junwu): + # the following line blocks, may need to pre-compute + # and cache it outside the for loop + indptr = data.indptr.asnumpy() + row_idx_array = [] + for s in slices: + row_idx_array.append(row_indices[indptr[s.start]:indptr[s.stop]]) + kv.row_sparse_pull(key, weight_array, priority=priority, row_ids=row_idx_array) if __name__ == '__main__': diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index ea01ef99b320..cd0d3ab02825 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -209,7 +209,8 @@ class CommCPU : public Comm { << "BroadcastRowSparse with row_indices on gpu context not supported"; // retain according to unique indices const bool use_sparse_retain = (src.shape()[0] != src.storage_shape()[0]) - || (row_id.dtype() != out->aux_type(rowsparse::kIdx)); + || (row_id.dtype() != out->aux_type(rowsparse::kIdx)) + || (out->ctx().dev_mask() != Context::kGPU); if (use_sparse_retain) { // use sparse_retain op const bool is_to_gpu = out->ctx().dev_mask() == Context::kGPU; NDArray out_cpu = is_to_gpu? NDArray(kRowSparseStorage, src.shape(), diff --git a/src/operator/tensor/sparse_retain-inl.h b/src/operator/tensor/sparse_retain-inl.h index b6f684f846af..5add57c83b24 100644 --- a/src/operator/tensor/sparse_retain-inl.h +++ b/src/operator/tensor/sparse_retain-inl.h @@ -209,6 +209,25 @@ struct SparseRetainCopyIndices { } }; +/*! + * Copy input retained rows to output rows. + * Only used when input rsp is dense. + * This kernel is only used when ctx is on GPU. + * So it's parallelized by out_rows' elements, + * instead of rows. + * For CPU ctx, we simply call mshadow::Copy. + */ +struct SparseRetainCopyRetainedRowsFromDns { + template + MSHADOW_XINLINE static void Map(int i, DType* out_rows, const DType* in_rows, + const RType* in_row_idx, const IType* idx, + const size_t row_length) { + const size_t irow = i / row_length; + const size_t icol = i % row_length; + out_rows[i] = in_rows[static_cast(idx[irow]) * row_length + icol]; + } +}; + template void SparseRetainOpForwardRspImpl(mshadow::Stream *s, const NDArray& input_nd, @@ -255,14 +274,21 @@ void SparseRetainOpForwardRspImpl(mshadow::Stream *s, output_idx.dptr(), idx_data.dptr()); } // copy data - const Tensor input_tensor = - input_data.get_with_shape(Shape2(input_data.shape_[0], row_length), s); - Tensor output_tensor = - output_data.get_with_shape(Shape2(output_data.shape_[0], row_length), s); - for (size_t i = 0; i < num_rows_retained; ++i) { - Copy(output_tensor[i], input_tensor[output_idx_tensor[i]], s); + if (std::is_same::value) { // For cpu, we can access output_idx_tensor[i] + const Tensor input_tensor = + input_data.get_with_shape(Shape2(input_data.shape_[0], row_length), s); + Tensor output_tensor = + output_data.get_with_shape(Shape2(output_data.shape_[0], row_length), + s); + for (size_t i = 0; i < num_rows_retained; ++i) { + Copy(output_tensor[i], input_tensor[output_idx_tensor[i]], s); + } + } else { // For gpu, have to kernel launch + Kernel::Launch(s, output_data.Size(), + output_data.dptr(), input_data.dptr(), input_idx.dptr(), + idx_data.dptr(), row_length); } - } else { + } else { // input rsp is not dense Kernel::Launch(s, idx_data.Size(), output_data.dptr(), output_idx.dptr(), input_data.dptr(), input_idx.dptr(), idx_data.dptr(), input_data.shape_[0], row_length); diff --git a/tests/python/gpu/test_kvstore_gpu.py b/tests/python/gpu/test_kvstore_gpu.py index 2a21204e2a69..6d3ba989a84f 100644 --- a/tests/python/gpu/test_kvstore_gpu.py +++ b/tests/python/gpu/test_kvstore_gpu.py @@ -20,7 +20,7 @@ def init_kv_with_str(stype='default'): def test_row_sparse_pull(): kv = init_kv_with_str('row_sparse') - kv.init('e', mx.nd.ones(shape)._to_rsp()) + kv.init('e', mx.nd.ones(shape).tostype('row_sparse')) def check_row_sparse_pull(kv, count, ctx=default_context()): num_rows = shape[0] @@ -28,7 +28,7 @@ def check_row_sparse_pull(kv, count, ctx=default_context()): row_ids = [] all_row_ids = np.arange(num_rows) for i in range(count): - vals.append(mx.nd.zeros(shape, ctx=ctx)._to_rsp()) + vals.append(mx.nd.zeros(shape, ctx=ctx).tostype('row_sparse')) row_id = np.random.randint(num_rows, size=num_rows) row_ids.append(mx.nd.array(row_id, dtype='int64')) row_ids_to_pull = row_ids[0] if len(row_ids) == 1 else row_ids