Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvstore.row_sparse_pull for GPU and end-to-end benchmark: CPU vs. multi-GPUs #150

Merged
226 changes: 226 additions & 0 deletions benchmark/python/sparse_end2end.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
from mxnet.test_utils import *
import time
import argparse
import os

parser = argparse.ArgumentParser(description="Run sparse linear regression " \
"with distributed kvstore",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--profiler', type=int, default=0,
help='whether to use profiler')
parser.add_argument('--num-epoch', type=int, default=1,
help='number of epochs to train')
parser.add_argument('--batch-size', type=int, default=512,
help='number of examples per batch')
parser.add_argument('--num-batch', type=int, default=99999999,
help='number of batches per epoch')
parser.add_argument('--dummy-iter', type=int, default=0,
help='whether to use dummy iterator to exclude io cost')
parser.add_argument('--kvstore', type=str, default='local',
help='what kvstore to use [local, dist_sync, etc]')
parser.add_argument('--log-level', type=str, default='debug',
help='logging level [debug, info, error]')
parser.add_argument('--dataset', type=str, default='avazu',
help='what test dataset to use')
parser.add_argument('--num-gpu', type=int, default=0,
help='number of gpus to use. 0 means using cpu(0);'
'otherwise, use gpu(0),...,gpu(num_gpu-1)')
parser.add_argument('--output-dim', type=int, default=4,
help='number of columns of the forward output')


def get_libsvm_data(data_dir, data_name, url, data_origin_name):
if not os.path.isdir(data_dir):
os.system("mkdir " + data_dir)
os.chdir(data_dir)
if (not os.path.exists(data_name)):
import urllib
zippath = os.path.join(data_dir, data_origin_name)
urllib.urlretrieve(url, zippath)
os.system("bzip2 -d %r" % data_origin_name)
os.chdir("..")


class DummyIter(mx.io.DataIter):
"A dummy iterator that always return the same batch, used for speed testing"
def __init__(self, real_iter):
super(DummyIter, self).__init__()
self.real_iter = real_iter
self.provide_data = real_iter.provide_data
self.provide_label = real_iter.provide_label
self.batch_size = real_iter.batch_size

for batch in real_iter:
self.the_batch = batch
break

def __iter__(self):
return self

def next(self):
return self.the_batch

# testing dataset sources
avazu = {
'data_name': 'avazu-app.t',
'data_origin_name': 'avazu-app.t.bz2',
'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/avazu-app.t.bz2",
'feature_dim': 1000000,
}

kdda = {
'data_name': 'kdda.t',
'data_origin_name': 'kdda.t.bz2',
'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2",
'feature_dim': 20216830,
}

datasets = { 'kdda' : kdda, 'avazu' : avazu }


def get_sym(feature_dim):
x = mx.symbol.Variable("data", stype='csr')
norm_init = mx.initializer.Normal(sigma=0.01)
w = mx.symbol.Variable("w", shape=(feature_dim, args.output_dim), init=norm_init, stype='row_sparse')
embed = mx.symbol.dot(x, w)
y = mx.symbol.Variable("softmax_label")
model = mx.symbol.SoftmaxOutput(data=embed, label=y, name="out")
return model


def row_sparse_pull(kv, key, data, slices, weight_array, priority):
# if have kvstore, need to pull corresponding rows of
# the weights to each context
# column indices (NDArray type) of the csr data
# used as the row_idx of the weight row-sparse matrix
row_indices = data.indices
if len(slices) == 1:
kv.row_sparse_pull(key, weight_array, priority=priority, row_ids=row_indices)
else: # more than one slices, multi-GPU training. Need to retain weight rows according to data slices
# TODO(junwu):
# the following line blocks, may need to pre-compute
# and cache it outside the for loop
indptr = data.indptr.asnumpy()
row_idx_array = []
for s in slices:
row_idx_array.append(row_indices[indptr[s.start]:indptr[s.stop]])
kv.row_sparse_pull(key, weight_array, priority=priority, row_ids=row_idx_array)


if __name__ == '__main__':

# arg parser
args = parser.parse_args()
num_epoch = args.num_epoch
num_batch = args.num_batch
kvstore = args.kvstore
profiler = args.profiler > 0
batch_size = args.batch_size if args.num_gpu == 0 else args.num_gpu * args.batch_size
dummy_iter = args.dummy_iter
dataset = args.dataset
log_level = args.log_level
contexts = mx.context.cpu(0) if args.num_gpu < 1\
else [mx.context.gpu(i) for i in range(args.num_gpu)]

# create kvstore when there are gpus
kv = mx.kvstore.create(kvstore) if args.num_gpu >= 1 else None
rank = kv.rank if kv is not None else 0
num_worker = kv.num_workers if kv is not None else 1

# only print log for rank 0 worker
import logging
if rank != 0:
log_level = logging.ERROR
elif log_level == 'DEBUG':
log_level = logging.DEBUG
else:
log_level = logging.INFO
head = '%(asctime)-15s %(message)s'
logging.basicConfig(level=log_level, format=head)

# dataset
assert(dataset in datasets), "unknown dataset " + dataset
metadata = datasets[dataset]
feature_dim = metadata['feature_dim']
if logging:
logging.debug('preparing data ... ')
data_dir = os.path.join(os.getcwd(), 'data')
path = os.path.join(data_dir, metadata['data_name'])
if not os.path.exists(path):
get_libsvm_data(data_dir, metadata['data_name'], metadata['url'],
metadata['data_origin_name'])
assert os.path.exists(path)

# data iterator
train_data = mx.io.LibSVMIter(data_libsvm=path, data_shape=(feature_dim,),
batch_size=batch_size, num_parts=num_worker,
part_index=rank)
if dummy_iter:
train_data = DummyIter(train_data)

# model
model = get_sym(feature_dim)

# module
mod = mx.mod.Module(symbol=model, data_names=['data'],
label_names=['softmax_label'], context=contexts)
mod.bind(data_shapes=train_data.provide_data, label_shapes=train_data.provide_label)
mod.init_params(initializer=mx.init.Uniform(scale=.1))
sgd = mx.optimizer.SGD(momentum=0.0, clip_gradient=5.0,
learning_rate=0.1, rescale_grad=1.0/batch_size/num_worker)
mod.init_optimizer(optimizer=sgd, kvstore=kv)
# use accuracy as the metric
metric = mx.metric.create('acc')

index = mod._exec_group.param_names.index('w')
# weight_array bound to executors of the contexts
weight_array = mod._exec_group.param_arrays[index]

# start profiler
if profiler:
device = 'cpu'
if args.num_gpu > 0:
device = 'gpu' + str(args.num_gpu)
name = 'profile_' + args.dataset + '_' + device + '_nworker' + str(num_worker)\
+ '_batchsize' + str(args.batch_size) + '_outdim' + str(args.output_dim) + '.json'
mx.profiler.profiler_set_config(mode='all', filename=name)
mx.profiler.profiler_set_state('run')

logging.debug('start training ...')
start = time.time()
data_iter = iter(train_data)
for epoch in range(num_epoch):
nbatch = 0
end_of_batch = False
data_iter.reset()
metric.reset()
next_batch = next(data_iter)
if kv is not None:
row_sparse_pull(kv, 'w', next_batch.data[0], mod._exec_group.slices, weight_array, -index)
while not end_of_batch:
nbatch += 1
batch = next_batch

mod.forward_backward(batch)
# update parameters
mod.update()

try:
# pre fetch next batch
next_batch = next(data_iter)
if nbatch == num_batch:
raise StopIteration
if kv is not None:
row_sparse_pull(kv, 'w', next_batch.data[0], mod._exec_group.slices, weight_array, -index)
except StopIteration:
end_of_batch = True
# accumulate prediction accuracy
mod.update_metric(metric, batch.label)
logging.info('epoch %d, %s' % (epoch, metric.get()))
if epoch == 0:
print "num_batches = ", nbatch
if profiler:
mx.profiler.profiler_set_state('stop')
end = time.time()
time_cost = end - start
logging.info('num_worker = ' + str(num_worker) + ', time cost = ' + str(time_cost))
6 changes: 6 additions & 0 deletions include/mxnet/ndarray.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ class NDArray {
return ptr_->aux_shapes;
}

/*! returns the dtypes of all aux data */
const std::vector<int>& aux_types() const {
CHECK(storage_type() != kDefaultStorage);
return ptr_->aux_types;
}

/*!
* \brief For a sparse operation on a csr matrix for example,
* the size of the column index array
Expand Down
4 changes: 3 additions & 1 deletion src/executor/graph_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,9 @@ void GraphExecutor::InitDataEntryMemory(std::vector<NDArray>* shared_pool) {
CHECK_LE(nword, std::numeric_limits<nnvm::dim_t>::max());
// allocate float arrays
TShape shape{static_cast<nnvm::dim_t>(nword)};
NDArray nd(shape, ctx);
// TODO(junwu): adding delay_alloc=true to create nd
// is a temporary solution.
NDArray nd(shape, ctx, true);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding delay_alloc=True should work just fine. Did it break any unit test?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't break any unit tests.

data_pool_[i] = nd;
// put the new allocated arrays to shared pool
if (shared_pool != nullptr) {
Expand Down
110 changes: 92 additions & 18 deletions src/kvstore/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ class CommCPU : public Comm {
// avoid extra copy for single device, but it may bring problems for
// abnormal usage of kvstore
if (src.size() == 1) {
if (src[0].storage_type() == buf.merged.storage_type()) {
if (src[0].storage_type() == kDefaultStorage) {
return src[0];
} else {
} else { // if sparse and only one GPU, always update weight on CPU
CopyFromTo(src[0], &buf.merged, priority);
return buf.merged;
}
Expand Down Expand Up @@ -188,39 +188,113 @@ class CommCPU : public Comm {
}
}

// TODO(haibin) support broadcast row_sparse on GPU
void BroadcastRowSparse(int key, const NDArray& src,
const std::vector<std::pair<NDArray*, NDArray>>& dst,
const bool use_copy,
const int priority) override {
using namespace mshadow;
auto size = dst.size();
for (size_t i = 0; i < size; i++) {
auto out = dst[i].first;
auto row_id = dst[i].second;
CHECK_EQ(src.storage_type(), kRowSparseStorage)
<< "BroadcastRowSparse expects row-sparse src NDArray";
CHECK_EQ(src.ctx().dev_mask(), Context::kCPU)
<< "BroadcastRowSparse with src on gpu context not supported";
for (size_t i = 0; i < dst.size(); ++i) {
NDArray* out = dst[i].first;
NDArray row_id = dst[i].second;
if (use_copy) {
CopyFromTo(src, out, priority);
} else {
CHECK_EQ(out->storage_type(), kRowSparseStorage)
<< "BroadcastRowSparse expects row_sparse dst NDArray";
CHECK_EQ(out->ctx().dev_mask(), Context::kCPU)
<< "BroadcastRowSparse with dst on gpu context not supported";
CHECK_EQ(row_id.ctx().dev_mask(), Context::kCPU)
<< "BroadcastRowSparse with src on gpu context not supported";
<< "BroadcastRowSparse with row_indices on gpu context not supported";
// retain according to unique indices
Engine::Get()->PushSync([src, out, row_id](RunContext rctx) {
NDArray *output = out;
const auto indices = row_id.data();
op::SparseRetainOpForwardRspImpl<cpu>(rctx.get_stream<cpu>(),
src, indices, kWriteTo,
output);
}, Context::CPU(), {src.var(), row_id.var()}, {out->var()},
FnProperty::kNormal, priority, PROFILER_MESSAGE("KVStoreSparseRetain"));
const bool use_sparse_retain = (src.shape()[0] != src.storage_shape()[0])
|| (row_id.dtype() != out->aux_type(rowsparse::kIdx))
|| (out->ctx().dev_mask() != Context::kGPU);
if (use_sparse_retain) { // use sparse_retain op
const bool is_to_gpu = out->ctx().dev_mask() == Context::kGPU;
NDArray out_cpu = is_to_gpu? NDArray(kRowSparseStorage, src.shape(),
src.ctx(), true, src.dtype(), src.aux_types()) : *out;
Engine::Get()->PushSync([=](RunContext rctx) {
const TBlob& indices = row_id.data();
NDArray temp = out_cpu; // get rid of const qualifier
op::SparseRetainOpForwardRspImpl<cpu>(rctx.get_stream<cpu>(),
src, indices, kWriteTo,
&temp);
}, Context::CPU(), {src.var(), row_id.var()}, {out_cpu.var()},
FnProperty::kNormal, priority, PROFILER_MESSAGE("KVStoreSparseRetain"));
if (is_to_gpu) {
CopyFromTo(out_cpu, out, priority);
}
} else { // direct copy rows
Engine::Get()->PushSync([=](RunContext rctx) {
CopyRetainedRowsToGPU(rctx.get_stream<cpu>(), rctx.get_stream<gpu>(),
src, row_id, out);
}, out->ctx(), {src.var(), row_id.var()}, {out->var()},
FnProperty::kCopyToGPU, priority, PROFILER_MESSAGE("KVStoreCopyRetainedRowsToGPU"));
}
}
}
}

private:
/*!
* \brief When src is a rsp with full rows,
* simply copy retained rows directly from cpu to gpu
* without invoking sparse_retain op.
*/
void CopyRetainedRowsToGPU(mshadow::Stream<cpu>* cpu_stream,
mshadow::Stream<gpu>* gpu_stream,
const NDArray& src,
const NDArray& indices,
NDArray* dst) {
#if MXNET_USE_CUDA == 1
CHECK_EQ(src.storage_type(), kRowSparseStorage)
<< "CopyRetainedRowsToGPU expects row-sparse src NDArray";
CHECK_EQ(src.ctx().dev_mask(), Context::kCPU)
<< "CopyRetainedRowsToGPU with src on gpu context not supported";
CHECK_EQ(src.storage_shape()[0], src.shape()[0])
<< "CopyRetainedRowsToGPU only supports src rsp with full rows";
CHECK_EQ(indices.storage_type(), kDefaultStorage);
CHECK_EQ(indices.ctx().dev_mask(), Context::kCPU);
CHECK_EQ(dst->storage_type(), kRowSparseStorage);
CHECK_EQ(dst->ctx().dev_mask(), Context::kGPU);
CHECK_EQ(indices.dtype(), dst->aux_type(rowsparse::kIdx))
<< "CopyRetainedRowsToGPU only supports same data type for idx array and dst aux_data(0)";
if (!src.storage_initialized() || indices.data().Size() == 0U) {
op::FillZerosRspImpl(gpu_stream, dst);
return;
}
using namespace mshadow;

const TBlob& src_data = src.data();
const TBlob& idx_data = indices.data();
const size_t row_length = src.shape().ProdShape(1, src.shape().ndim());
const size_t num_rows_retained = idx_data.Size();
dst->CheckAndAlloc({Shape1(num_rows_retained)});
TBlob dst_data = dst->data();
TBlob dst_idx_data = dst->aux_data(rowsparse::kIdx);
MSHADOW_TYPE_SWITCH(src.dtype(), DType, {
MSHADOW_IDX_TYPE_SWITCH(indices.dtype(), IType, {
// copy idx array
Tensor<gpu, 1, IType> dst_idx_tensor = dst_idx_data.FlatTo1D<gpu, IType>(gpu_stream);
const Tensor<cpu, 1, IType> idx_tensor = idx_data.FlatTo1D<cpu, IType>(cpu_stream);
Copy(dst_idx_tensor, idx_tensor, gpu_stream);
// copy src data
const Tensor<cpu, 2, DType> src_data_tensor = src_data.get_with_shape<cpu, 2, DType>(
Shape2(src_data.shape_[0], row_length), cpu_stream);
Tensor<gpu, 2, DType> dst_data_tensor = dst_data.get_with_shape<gpu, 2, DType>(
Shape2(dst_data.shape_[0], row_length), gpu_stream);
for (size_t i = 0; i < num_rows_retained; ++i) {
Copy(dst_data_tensor[i], src_data_tensor[idx_tensor[i]], gpu_stream);
}
})
})
#else
LOG(FATAL) << "GPU not enabled";
#endif
}

// reduce sum into val[0]
inline void ReduceSumCPU(const std::vector<NDArray> &in_data) {
MSHADOW_TYPE_SWITCH(in_data[0].dtype(), DType, {
Expand Down
Loading