Skip to content

Commit

Permalink
kvstore push row sparse (apache#93)
Browse files Browse the repository at this point in the history
* Add multi-thread cpu elemwise sum for rsps

* Minor fix

* Add flag to switch between serial and multi-thread kvstore push

* Fix lint in sparse_ndarray.py

* Revert "Fix lint in sparse_ndarray.py"

This reverts commit d7225ec.

* Fix ndarray init in copy(ctx)

* Add env var to control the flow of serial/parallel reduce

* Refactor

* Fix copy ndarray bug

* Fix lint

* Refactor
  • Loading branch information
reminisce authored and Olivier committed Jun 15, 2017
1 parent 9d076b5 commit c32ba68
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 3 deletions.
52 changes: 52 additions & 0 deletions src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <random>
#include <thread>
#include <algorithm>
#include <functional>
#endif // DMLC_USE_CXX11

#include <dmlc/logging.h>
Expand Down Expand Up @@ -168,6 +169,57 @@ inline int GetExecNumMatchColor() {
return std::min(num_match_color, GetNumThreadPerGPU());
}

/*!
* \brief
* Helper function for ParallelSort.
* DO NOT call this function directly.
* Use the interface ParallelSort instead.
* Ref: https://github.com/dmlc/difacto/blob/master/src/common/parallel_sort.h
*/
template<typename RandomIt, typename Compare>
void ParallelSortHelper(RandomIt first, size_t len,
size_t grainsize, const Compare& comp) {
if (len < grainsize) {
std::sort(first, first+len, comp);
} else {
std::thread thr(ParallelSortHelper<RandomIt, Compare>, first, len/2, grainsize, comp);
ParallelSortHelper(first+len/2, len - len/2, grainsize, comp);
thr.join();
std::inplace_merge(first, first+len/2, first+len, comp);
}
}

/*!
* \brief
* Sort the elements in the range [first, last) into the ascending order defined by
* the comparator comp.
* If the length of the range [first, last) is greater than a certain threshold,
* the range will be recursively divided into two and assign two threads
* to sort each half range.
* Ref: https://github.com/dmlc/difacto/blob/master/src/common/parallel_sort.h
*/
template<typename RandomIt, typename Compare>
void ParallelSort(RandomIt first, RandomIt last, size_t num_threads, Compare comp) {
const auto num = std::distance(first, last);
size_t grainsize = std::max(num / num_threads + 5, static_cast<size_t>(1024*16));
ParallelSortHelper(first, num, grainsize, comp);
}

/*!
* \brief
* Sort the elements in the range [first, last) into ascending order.
* The elements are compared using the default < operator.
* If the length of the range [first, last) is greater than a certain threshold,
* the range will be recursively divided into two and assign two threads
* to sort each half range.
* Ref: https://github.com/dmlc/difacto/blob/master/src/common/parallel_sort.h
*/
template<typename RandomIt>
void ParallelSort(RandomIt first, RandomIt last, size_t num_threads) {
ParallelSort(first, last, num_threads,
std::less<typename std::iterator_traits<RandomIt>::value_type>());
}

/*!
* \brief Random Engine
*/
Expand Down
120 changes: 118 additions & 2 deletions src/kvstore/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
*/
#ifndef MXNET_KVSTORE_COMM_H_
#define MXNET_KVSTORE_COMM_H_
#include <dmlc/omp.h>
#include <string>
#include <algorithm>
#include <utility>
#include <limits>
#include <vector>
#include <tuple>
#include <thread>
#include "mxnet/ndarray.h"
#include "../common/utils.h"
namespace mxnet {
namespace kvstore {
/**
Expand Down Expand Up @@ -65,6 +68,8 @@ class CommCPU : public Comm {
CommCPU() {
nthread_reduction_ = dmlc::GetEnv("MXNET_KVSTORE_REDUCTION_NTHREADS", 4);
bigarray_bound_ = dmlc::GetEnv("MXNET_KVSTORE_BIGARRAY_BOUND", 1000 * 1000);
// TODO(junwu) delete the following data member, now for benchmark only
is_serial_push_ = dmlc::GetEnv("MXNET_KVSTORE_SERIAL_PUSH", 0);
}
virtual ~CommCPU() { }

Expand Down Expand Up @@ -130,7 +135,8 @@ class CommCPU : public Comm {
auto result = buf.merged;
Engine::Get()->PushSync([reduce, result, this](RunContext rctx) {
NDArray out = result;
ReduceSumCPUEx(reduce, &out);
is_serial_push_?
ReduceSumCPUExSerial(reduce, &out) : ReduceSumCPUExParallel(reduce, &out);
}, Context::CPU(), const_vars, {result.var()},
FnProperty::kCPUPrioritized, priority, PROFILER_MESSAGE("KVStoreReduce"));
}
Expand Down Expand Up @@ -168,7 +174,7 @@ class CommCPU : public Comm {

// serial implementation of reduce sum for row sparse NDArray.
// TODO(haibin) use openmp kernel to parallelize the summation
inline void ReduceSumCPUEx(const std::vector<NDArray> &in, NDArray *out) {
inline void ReduceSumCPUExSerial(const std::vector<NDArray> &in, NDArray *out) {
using namespace rowsparse;
using namespace mshadow;
auto stype = out->storage_type();
Expand Down Expand Up @@ -239,6 +245,115 @@ class CommCPU : public Comm {
});
}

template<typename DType, typename IType>
void ReduceSumCPUExImpl(const std::vector<NDArray>& nds,
const std::vector<IType>& uniq_row_idx,
NDArray* out) {
#pragma omp parallel num_threads(nthread_reduction_)
{
const size_t nnr = uniq_row_idx.size();
const int num_threads = omp_get_num_threads();
size_t row_block_len = (nnr + num_threads - 1) / num_threads;
const size_t row_block_start = omp_get_thread_num() * row_block_len;
if (row_block_start < nnr) {
const size_t row_block_end = std::min(row_block_start+row_block_len, nnr);

auto out_values = out->data().FlatTo2D<cpu, DType>();
auto out_indices = out->aux_data(rowsparse::kIdx).FlatTo1D<cpu, IType>();
for (size_t i = row_block_start; i < row_block_end; ++i) {
out_indices[i] = uniq_row_idx[i];
}
for (const auto& nd : nds) {
if (nd.storage_initialized()) {
const auto nd_indices = nd.aux_data(rowsparse::kIdx).FlatTo1D<cpu, IType>();
const auto nd_values = nd.data().FlatTo2D<cpu, DType>();
const auto nd_num_rows = nd.aux_shape(rowsparse::kIdx).Size();
const IType* nd_indices_start = &nd_indices[0];
const IType* nd_indices_end = nd_indices_start + nd_num_rows;
const IType* row_idx_ptr = std::lower_bound(nd_indices_start, nd_indices_end,
out_indices[row_block_start]);
// skip this nd if all of its row indices are smaller than out_indices[row_block_start]
// or current row block is not covered by [*row_idx_ptr, nd_indices_end).
if (nd_indices_end == row_idx_ptr || *row_idx_ptr > out_indices[row_block_end-1]) {
continue;
}
for (size_t irow = row_block_start;
irow < row_block_end && row_idx_ptr != nd_indices_end;) {
if (out_indices[irow] == *row_idx_ptr) {
auto out_value_cur_row = out_values[irow];
const auto offset = row_idx_ptr - nd_indices_start;
auto nd_value_cur_row = nd_values[offset];
for (size_t j = 0; j < nd_value_cur_row.shape_[0]; ++j) {
out_value_cur_row[j] += nd_value_cur_row[j];
}
++irow;
++row_idx_ptr;
} else if (out_indices[irow] < *row_idx_ptr) {
++irow;
} else {
++row_idx_ptr;
}
}
}
}
}
}
}

/*!
* \brief Given a vector of ndarrays, generate a index vector containing
* all the unique row indices of the ndarrays.
*/
template<typename IType>
void GetUniqueRspRowIdx(const std::vector<NDArray>& nds,
std::vector<IType>* uniq_row_idx) {
using namespace rowsparse;
size_t total_num_rows = 0;
for (const auto& nd : nds) {
CHECK_EQ(nd.storage_type(), kRowSparseStorage);
if (nd.storage_initialized()) {
total_num_rows += nd.aux_shape(kIdx).Size();
}
}

uniq_row_idx->resize(total_num_rows);
int nthreads = omp_get_max_threads();
size_t offset = 0;
for (const auto& nd : nds) {
if (nd.storage_initialized()) {
const IType* nd_row_idx = nd.aux_data(kIdx).dptr<IType>();
const size_t num_rows = nd.aux_shape(kIdx).Size();
#pragma omp parallel for num_threads(nthreads)
for (size_t i = 0; i < num_rows; ++i) {
(*uniq_row_idx)[offset+i] = nd_row_idx[i];
}
offset += num_rows;
}
}

common::ParallelSort(uniq_row_idx->begin(), uniq_row_idx->end(), nthreads);
auto it = std::unique(uniq_row_idx->begin(), uniq_row_idx->end());
uniq_row_idx->resize(it - uniq_row_idx->begin());
}

void ReduceSumCPUExParallel(const std::vector<NDArray>& nds, NDArray* out) {
if (nds.empty()) return;
using namespace rowsparse;
CHECK_EQ(out->storage_type(), kRowSparseStorage)
<< "Expected row sparse storage type ("
<< out->storage_type() << " given)";

MSHADOW_TYPE_SWITCH(out->dtype(), DType, {
MSHADOW_INT_TYPE_SWITCH(out->aux_type(kIdx), IType, {
std::vector<IType> uniq_row_idx;
GetUniqueRspRowIdx(nds, &uniq_row_idx);
out->CheckAndAlloc({mshadow::Shape1(uniq_row_idx.size())});
out->data().FlatTo2D<cpu, DType>() = static_cast<DType>(0);
ReduceSumCPUExImpl<DType, IType>(nds, uniq_row_idx, out);
});
});
}

template<typename DType>
inline static void ReduceSumCPU(
const std::vector<DType*> &dptr, size_t offset, index_t size) {
Expand Down Expand Up @@ -304,6 +419,7 @@ class CommCPU : public Comm {
std::unordered_map<int, BufferEntry> merge_buf_;
size_t bigarray_bound_;
int nthread_reduction_;
bool is_serial_push_;
};

/**
Expand Down
11 changes: 10 additions & 1 deletion src/ndarray/ndarray.cc
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,16 @@ void NDArray::Load(dmlc::Stream* fi,
}

NDArray NDArray::Copy(Context ctx) const {
NDArray ret(shape(), ctx, true, dtype_);
NDArray ret;
if (kDefaultStorage == storage_type()) {
ret = NDArray(shape(), ctx, true, dtype_);
} else if (kUndefinedStorage != storage_type()) {
ret = NDArray(storage_type(), shape(), ctx, true, dtype_,
ptr_->aux_types, ptr_->aux_shapes, storage_shape());
} else {
LOG(FATAL) << "NDArray::Copy cannot copy undefined storage-type ndarray to ctx.dev_type="
<< ctx.dev_type << ", ctx.dev_id=" << ctx.dev_id;
}
CopyFromTo(*this, &ret);
return ret;
}
Expand Down

0 comments on commit c32ba68

Please sign in to comment.