diff --git a/dmlc-core b/dmlc-core index fcf831a32392..fc66c6241f02 160000 --- a/dmlc-core +++ b/dmlc-core @@ -1 +1 @@ -Subproject commit fcf831a3239249588b014c3e6cd2bdb7366547e1 +Subproject commit fc66c6241f0278c619ed3c25b895bda0e7de99fd diff --git a/include/mxnet/c_api.h b/include/mxnet/c_api.h index f10e08bca3c3..b6c6dc83bdb2 100644 --- a/include/mxnet/c_api.h +++ b/include/mxnet/c_api.h @@ -385,6 +385,19 @@ MXNET_DLL int MXNDArraySlice(NDArrayHandle handle, mx_uint slice_begin, mx_uint slice_end, NDArrayHandle *out); + +/*! + * \brief Slice the NDArray with non-default storage along axis 0. + * \param handle the handle to the NDArray + * \param slice_begin The beginning index of slice + * \param slice_end The ending index of slice + * \param out The NDArrayHandle of sliced NDArray + * \return 0 when success, -1 when failure happens + */ +MXNET_DLL int MXNDArraySliceEx(NDArrayHandle handle, + mx_uint slice_begin, + mx_uint slice_end, + NDArrayHandle out); /*! * \brief Index the NDArray along axis 0. * \param handle the handle to the NDArray diff --git a/include/mxnet/ndarray.h b/include/mxnet/ndarray.h index cf61b4302a59..d01352e795e4 100644 --- a/include/mxnet/ndarray.h +++ b/include/mxnet/ndarray.h @@ -209,7 +209,6 @@ class NDArray { /*! * \return the shape of aux data at ith index. If it doesn't exist, return an empty one. */ - // TODO(haibin) CamelCase inline const TShape aux_shape(size_t i) const { CHECK(storage_type() != kDefaultStorage); return ptr_->aux_shapes[i]; @@ -239,9 +238,7 @@ class NDArray { auto dptr = static_cast(ptr_->shandle.dptr); if (stype == kDefaultStorage) { dptr += offset_; - } else if (stype == kCSRStorage) { - shape = storage_shape(); - } else if (stype == kRowSparseStorage) { + } else if (stype == kCSRStorage || stype == kRowSparseStorage) { shape = storage_shape(); } else { LOG(FATAL) << "unknown storage type " << stype; @@ -263,13 +260,8 @@ class NDArray { auto type = aux_type(i); MSHADOW_TYPE_SWITCH(type, DType, { auto dptr = static_cast(ptr_->aux_handles[i].dptr); - if (stype == kRowSparseStorage) { + if (stype == kRowSparseStorage || stype == kCSRStorage) { CHECK_EQ(offset_, 0); - } else if (stype == kCSRStorage) { - if (i == csr::kIndPtr) { - dptr += offset_; - shape[0] = shape_[0] + 1; - } } else { LOG(FATAL) << "Unexpected storage type"; } @@ -472,6 +464,14 @@ class NDArray { * \return sliced NDArray */ NDArray Slice(index_t begin, index_t end) const; + + /*! + * \brief Slice a NDArray with non-default storage + * \param begin begin index in first dim (inclusive) + * \param end end index in first dim (exclusive) + * \return sliced NDArray + */ + void SliceEx(index_t begin, index_t end, NDArray *dst) const; /*! * \brief Index a NDArray * \param idx the index @@ -480,14 +480,14 @@ class NDArray { NDArray At(index_t idx) const; // Wrap the tblob of aux data into an NDArray which shares the same variable with the // current one. - inline const NDArray AuxNDArray(size_t i) const { + inline const NDArray aux_ndarray(size_t i) const { CHECK_NE(storage_type(), kDefaultStorage); CHECK(i < ptr_->aux_shapes.size()); return NDArray(aux_data(i), ctx().dev_id, var()); } // Wrap the tblob of data into an NDArray which shares the same variable with the // current one. - inline const NDArray DataNDArray() const { + inline const NDArray data_ndarray() const { CHECK_NE(storage_type(), kDefaultStorage); return NDArray(data(), ctx().dev_id, var()); } @@ -606,6 +606,9 @@ class NDArray { // \brief skip the deletion of var handle. Usually set when shared_var is present. bool skip_delete_var = false; + /*! \brief default cosntructor */ + Chunk() : static_data(true), delay_alloc(false) {} + /*! \brief construct a new chunk */ Chunk(TShape shape, Context ctx_, bool delay_alloc_, int dtype) : static_data(false), delay_alloc(true), ctx(ctx_) { @@ -779,7 +782,6 @@ inline void CopyFromToCsrImpl(const NDArray from, NDArray *to, RunContext ctx) { // if source storage is not initialized, fill destination with zeros auto s = ctx.get_stream(); if (!from.storage_initialized()) { - LOG(FATAL) << "To be implemented"; // TODO(haibin) implement FillZerosCsrImpl // op::FillZerosCsrImpl(s, to); return; diff --git a/python/mxnet/_ctypes/ndarray.py b/python/mxnet/_ctypes/ndarray.py index 6a1925243b89..88b5b5cbe9e6 100644 --- a/python/mxnet/_ctypes/ndarray.py +++ b/python/mxnet/_ctypes/ndarray.py @@ -14,7 +14,7 @@ from ..base import check_call from ..ndarray_doc import _build_doc -_ndarray_cls_map = {} +_ndarray_cls = None class NDArrayBase(object): """Base data structure for ndarray""" @@ -27,6 +27,8 @@ def __init__(self, handle, writable=True): ---------- handle : NDArrayHandle NDArray handle of C API + writable: bool + Whether the NDArrayBase could be modified """ if handle is not None: assert isinstance(handle, NDArrayHandle) @@ -177,14 +179,8 @@ def %s(%s): c_array(ctypes.c_char_p, [c_str(val) for val in vals]))) if original_output is not None: return original_output - ret_list = [] - for i in range(num_output.value): - storage_type = ctypes.c_int(0) - check_call(_LIB.MXNDArrayGetStorageType(ctypes.cast(output_vars[i], NDArrayHandle), - ctypes.byref(storage_type))) - ret_list.append(_ndarray_cls_map[storage_type.value](ctypes.cast(output_vars[i], \ - NDArrayHandle))) - return ret_list if num_output.value > 1 else ret_list[0] + ret = [_ndarray_cls(ctypes.cast(output_vars[i], NDArrayHandle)) for i in range(num_output.value)] + return ret if num_output.value > 1 else ret[0] """%handle.value) local = {} @@ -196,16 +192,16 @@ def %s(%s): return ndarray_function -def _set_storage_nd_map(storage_nd_map): +def _set_ndarray_cls(ndarray_cls): """Set the symbolic class to be cls""" - global _ndarray_cls_map - _ndarray_cls_map = storage_nd_map + global _ndarray_cls + _ndarray_cls = ndarray_cls # pylint: enable=too-many-locals, invalid-name -def _init_ndarray_module(storage_nd_map, root_namespace): +def _init_ndarray_module(ndarray_cls, root_namespace): """List and add all the ndarray functions to current module.""" - _set_storage_nd_map(storage_nd_map) + _set_ndarray_cls(ndarray_cls) plist = ctypes.POINTER(ctypes.c_char_p)() size = ctypes.c_uint() diff --git a/python/mxnet/kvstore.py b/python/mxnet/kvstore.py index 7ea7c748dbdf..c2528ee132ef 100644 --- a/python/mxnet/kvstore.py +++ b/python/mxnet/kvstore.py @@ -48,7 +48,7 @@ def updater_handle(key, lhs_handle, rhs_handle, _): class KVStore(object): """A key-value store for synchronization of values, over multiple devices.""" - def __init__(self, handle): + def __init__(self, handle, name2idx=None): """Initializes a new KVStore. Parameters @@ -58,6 +58,7 @@ def __init__(self, handle): """ assert isinstance(handle, KVStoreHandle) self.handle = handle + self.name2idx = name2idx if name2idx is not None else {} self._updater = None self._updater_func = None @@ -395,7 +396,7 @@ def _send_command_to_servers(self, head, body): check_call(_LIB.MXKVStoreSendCommmandToServers( self.handle, mx_uint(head), c_str(body))) -def create(name='local'): +def create(name='local', name2idx=None): """Creates a new KVStore. For single machine training, there are two commonly used types: @@ -435,4 +436,4 @@ def create(name='local'): handle = KVStoreHandle() check_call(_LIB.MXKVStoreCreate(c_str(name), ctypes.byref(handle))) - return KVStore(handle) + return KVStore(handle, name2idx=name2idx) diff --git a/python/mxnet/model.py b/python/mxnet/model.py index 5eddfac47981..b90500d4a9c5 100644 --- a/python/mxnet/model.py +++ b/python/mxnet/model.py @@ -37,7 +37,7 @@ 'eval_metric', 'locals']) -def _create_kvstore(kvstore, num_device, arg_params): +def _create_kvstore(kvstore, num_device, arg_params, name2idx=None): """Create kvstore This function select and create a proper kvstore if given the kvstore type. @@ -61,7 +61,7 @@ def _create_kvstore(kvstore, num_device, arg_params): # no need to use kv for single device and single machine kv = None else: - kv = kvs.create(kvstore) + kv = kvs.create(kvstore, name2idx=name2idx) if kvstore is 'local': # automatically select a proper local max_size = max(np.prod(param.shape) for param in @@ -85,25 +85,50 @@ def _initialize_kvstore(kvstore, param_arrays, arg_params, param_names, if update_on_kvstore: kvstore.pull(idx, param_on_devs, priority=-idx) -def _update_params_on_kvstore(param_arrays, grad_arrays, kvstore): - """Perform update of param_arrays from grad_arrays on kvstore.""" - for index, pair in enumerate(zip(param_arrays, grad_arrays)): +def _update_params_on_kvstore(param_arrays, grad_arrays, kvstore, + stype_dict=None, param_names=None): + """Perform update of param_arrays from grad_arrays on kvstore. + If `param_names` is None or kvstore doesn't have a `name2idx` dictionary, + the index of a param is determined by the order it appears in `param_arrays`. """ + stype_dict = {} if stype_dict is None else stype_dict + for i, pair in enumerate(zip(param_arrays, grad_arrays)): arg_list, grad_list = pair if grad_list[0] is None: continue + index = i + if param_names is not None: + name = param_names[i] + index = index if name not in kvstore.name2idx else kvstore.name2idx[name] + # cast storage type if stype doesn't match + if name in stype_dict: + for i, grad in enumerate(grad_list): + stype = stype_dict[name] + if grad_list[i].storage_type != stype: + grad_list[i] = nd.cast_storage(grad, stype) # push gradient, priority is negative index kvstore.push(index, grad_list, priority=-index) # pull back the weights kvstore.pull(index, arg_list, priority=-index) def _update_params(param_arrays, grad_arrays, updater, num_device, - kvstore=None): + kvstore=None, stype_dict=None, param_names=None): """Perform update of param_arrays from grad_arrays not on kvstore.""" - for index, pair in enumerate(zip(param_arrays, grad_arrays)): + stype_dict = {} if stype_dict is None else stype_dict + for i, pair in enumerate(zip(param_arrays, grad_arrays)): arg_list, grad_list = pair if grad_list[0] is None: continue + # cast storage type if stype doesn't match + if param_names is not None and param_names[i] in stype_dict: + for i, grad in enumerate(grad_list): + stype = stype_dict[param_names[i]] + if grad_list[i].storage_type != stype: + grad_list[i] = nd.cast_storage(grad, stype) + index = i if kvstore: + if param_names is not None: + name = param_names + index = index if name not in kvstore.name2idx else kvstore.name2idx[name] # push gradient, priority is negative index kvstore.push(index, grad_list, priority=-index) # pull back the sum gradients, to the same locations. diff --git a/python/mxnet/module/base_module.py b/python/mxnet/module/base_module.py index d88b38af04fc..0473da28d68a 100644 --- a/python/mxnet/module/base_module.py +++ b/python/mxnet/module/base_module.py @@ -838,9 +838,17 @@ def get_input_grads(self, merge_multi_context=True): """ raise NotImplementedError() - def update(self): + def update(self, storage_type_dict=None): """Update parameters according to the installed optimizer and the gradients computed - in the previous forward-backward batch. + in the previous forward-backward batch. The storage type of parameters is casted according + to `storage_type_dict`, if provided. + + Parameters + ---------- + storage_type_dict: dict of str to str + Defaults to ``None``. Desired storage types of parameters for parameter update. If the + parameter gradient is not of desired storage type, its storage type will be casted + before the update. Examples -------- diff --git a/python/mxnet/module/bucketing_module.py b/python/mxnet/module/bucketing_module.py index 97d207baf6c5..939daa4fb341 100644 --- a/python/mxnet/module/bucketing_module.py +++ b/python/mxnet/module/bucketing_module.py @@ -394,13 +394,13 @@ def backward(self, out_grads=None): assert self.binded and self.params_initialized self._curr_module.backward(out_grads=out_grads) - def update(self): + def update(self, storage_type_dict=None): """Update parameters according to installed optimizer and the gradient computed in the previous forward-backward cycle. """ assert self.binded and self.params_initialized and self.optimizer_initialized self._params_dirty = True - self._curr_module.update() + self._curr_module.update(storage_type_dict=storage_type_dict) def get_outputs(self, merge_multi_context=True): """Get outputs from a previous forward computation. diff --git a/python/mxnet/module/module.py b/python/mxnet/module/module.py index d307a6efdbb5..5b36973ddce5 100644 --- a/python/mxnet/module/module.py +++ b/python/mxnet/module/module.py @@ -454,8 +454,12 @@ def init_optimizer(self, kvstore='local', optimizer='sgd', if self._params_dirty: self._sync_params_from_devices() + name2idx = {} + for idx, name in enumerate(self._exec_group.param_names): + name2idx[name] = idx + (kvstore, update_on_kvstore) = \ - _create_kvstore(kvstore, len(self._context), self._arg_params) + _create_kvstore(kvstore, len(self._context), self._arg_params, name2idx=name2idx) batch_size = self._exec_group.batch_size if kvstore and 'dist' in kvstore.type and '_sync' in kvstore.type: @@ -550,7 +554,7 @@ def backward(self, out_grads=None): assert self.binded and self.params_initialized self._exec_group.backward(out_grads=out_grads) - def update(self): + def update(self, storage_type_dict=None): """Update parameters according to the installed optimizer and the gradients computed in the previous forward-backward batch. """ @@ -560,7 +564,9 @@ def update(self): if self._update_on_kvstore: _update_params_on_kvstore(self._exec_group.param_arrays, self._exec_group.grad_arrays, - self._kvstore) + self._kvstore, + stype_dict=storage_type_dict, + param_names=self._param_names) else: _update_params(self._exec_group.param_arrays, self._exec_group.grad_arrays, diff --git a/python/mxnet/module/python_module.py b/python/mxnet/module/python_module.py index d9b103c40bb3..d2afffc85c1b 100644 --- a/python/mxnet/module/python_module.py +++ b/python/mxnet/module/python_module.py @@ -110,7 +110,7 @@ def init_params(self, initializer=Uniform(0.01), arg_params=None, aux_params=Non """ pass - def update(self): + def update(self, storage_type_dict=None): """Update parameters according to the installed optimizer and the gradients computed in the previous forward-backward batch. Currently we do nothing here. Subclass should override this method if contains parameters. diff --git a/python/mxnet/module/sequential_module.py b/python/mxnet/module/sequential_module.py index 27842206f938..ac20f1236f75 100644 --- a/python/mxnet/module/sequential_module.py +++ b/python/mxnet/module/sequential_module.py @@ -342,14 +342,14 @@ def backward(self, out_grads=None): out_grads = module.get_input_grads() - def update(self): + def update(self, storage_type_dict=None): """Update parameters according to installed optimizer and the gradient computed in the previous forward-backward cycle. """ assert self.binded and self.params_initialized and self.optimizer_initialized for module in self._modules: - module.update() + module.update(storage_type_dict=storage_type_dict) def get_outputs(self, merge_multi_context=True): """Get outputs from a previous forward computation. diff --git a/python/mxnet/ndarray.py b/python/mxnet/ndarray.py index ab1be70f55b7..39d272300eaa 100644 --- a/python/mxnet/ndarray.py +++ b/python/mxnet/ndarray.py @@ -109,6 +109,11 @@ def waitall(): """ check_call(_LIB.MXNDArrayWaitAll()) +def _storage_type(handle): + storage_type = ctypes.c_int(0) + check_call(_LIB.MXNDArrayGetStorageType(handle, ctypes.byref(storage_type))) + return _STORAGE_TYPE_ID_TO_STR[storage_type.value] + class NDArray(NDArrayBase): """An array object representing a multidimensional, homogeneous array of fixed-size items. @@ -734,9 +739,7 @@ def dtype(self): @property def storage_type(self): - storage_type = ctypes.c_int(0) - check_call(_LIB.MXNDArrayGetStorageType(self.handle, ctypes.byref(storage_type))) - return _STORAGE_TYPE_ID_TO_STR[storage_type.value] + return _storage_type(self.handle) @property # pylint: disable= invalid-name, undefined-variable @@ -921,6 +924,13 @@ def as_in_context(self, context): return self return self.copyto(context) + def to_csr(self): + # pylint: disable=undefined-variable + return cast_storage(self, storage_type='csr') + + def to_rsp(self): + # pylint: disable=undefined-variable + return cast_storage(self, storage_type='row_sparse') def onehot_encode(indices, out): """One-hot encoding indices into matrix out. diff --git a/python/mxnet/optimizer.py b/python/mxnet/optimizer.py index 4f3c72823a09..d2d394076e89 100644 --- a/python/mxnet/optimizer.py +++ b/python/mxnet/optimizer.py @@ -4,7 +4,6 @@ import logging from .ndarray import NDArray, zeros, clip, sqrt, sign from .ndarray import sgd_update, sgd_mom_update, adam_update, rmsprop_update, rmspropalex_update -from .ndarray import sparse_sgd_update, sparse_sgd_mom_update from .random import normal @@ -356,35 +355,6 @@ def update(self, index, weight, grad, state): sgd_update(weight, grad, out=weight, lr=lr, wd=wd, **kwargs) - -@register -class SparseSGD(SGD): - """SGD for non-zero rows - """ - def __init__(self, **kwargs): - super(SparseSGD, self).__init__(**kwargs) - - def update(self, index, weight, grad, state): - assert(isinstance(weight, NDArray)) - assert(isinstance(grad, NDArray)) - lr = self._get_lr(index) - wd = self._get_wd(index) - self._update_count(index) - - kwargs = {'rescale_grad': self.rescale_grad} - if self.momentum > 0: - kwargs['momentum'] = self.momentum - if self.clip_gradient: - kwargs['clip_gradient'] = self.clip_gradient - - if state is not None: - sparse_sgd_mom_update(weight, grad, state, out=weight, - lr=lr, wd=wd, **kwargs) - else: - sparse_sgd_update(weight, grad, out=weight, - lr=lr, wd=wd, **kwargs) - - @register class DCASGD(Optimizer): """The DCASGD optimizer diff --git a/python/mxnet/sparse_ndarray.py b/python/mxnet/sparse_ndarray.py index e21ece416fba..41e8e2b7ed83 100644 --- a/python/mxnet/sparse_ndarray.py +++ b/python/mxnet/sparse_ndarray.py @@ -15,16 +15,15 @@ # import operator import numpy as np -from .base import _LIB, numeric_types #string_types -from .base import c_array, mx_real_t # , py_str, c_str +from .base import _LIB, numeric_types +from .base import c_array, mx_real_t from .base import mx_uint, NDArrayHandle, check_call -# from .base import ctypes2buffer from .context import Context from . import _ndarray_internal as _internal from . import ndarray from .ndarray import _DTYPE_NP_TO_MX, _DTYPE_MX_TO_NP -from .ndarray import _STORAGE_TYPE_STR_TO_ID#, _STORAGE_TYPE_ID_TO_STR -from .ndarray import NDArray +from .ndarray import _STORAGE_TYPE_STR_TO_ID +from .ndarray import NDArray, _storage_type # Use different verison of SymbolBase # When possible, use cython to speedup part of computation. @@ -45,9 +44,8 @@ 'csr': [np.int32, np.int32] } - def _new_alloc_handle(storage_type, shape, ctx, delay_alloc, dtype, aux_types, aux_shapes=None): - """Return a new handle with specified shape, type and context. + """Return a new handle with specified storage type, shape, dtype and context. Empty handle is only used to hold results @@ -77,15 +75,14 @@ def _new_alloc_handle(storage_type, shape, ctx, delay_alloc, dtype, aux_types, a ctypes.byref(hdl))) return hdl - class SparseNDArray(NDArray): """An array object representing a multidimensional, homogeneous array of fixed-size items, stored in sparse format. """ - __slots__ = [] - #def __reduce__(self): + def __reduce__(self): + raise Exception('Not implemented for SparseND yet!') # return SparseNDArray, (None,), self.__getstate__() def __add__(self, other): @@ -97,30 +94,18 @@ def __iadd__(self, other): def __radd__(self, other): raise Exception('Not implemented for SparseND yet!') - def __sub__(self, other): - raise Exception('Not implemented for SparseND yet!') - def __isub__(self, other): raise Exception('Not implemented for SparseND yet!') def __rsub__(self, other): raise Exception('Not implemented for SparseND yet!') - def __mul__(self, other): - raise Exception('Not implemented for SparseND yet!') - - def __neg__(self): - raise Exception('Not implemented for SparseND yet!') - def __imul__(self, other): raise Exception('Not implemented for SparseND yet!') def __rmul__(self, other): raise Exception('Not implemented for SparseND yet!') - def __div__(self, other): - raise Exception('Not implemented for SparseND yet!') - def __rdiv__(self, other): raise Exception('Not implemented for SparseND yet!') @@ -142,24 +127,6 @@ def __pow__(self, other): def __rpow__(self, other): raise Exception('Not implemented for SparseND yet!') - def __eq__(self, other): - raise Exception('Not implemented for SparseND yet!') - - def __ne__(self, other): - raise Exception('Not implemented for SparseND yet!') - - def __gt__(self, other): - raise Exception('Not implemented for SparseND yet!') - - def __ge__(self, other): - raise Exception('Not implemented for SparseND yet!') - - def __lt__(self, other): - raise Exception('Not implemented for SparseND yet!') - - def __le__(self, other): - raise Exception('Not implemented for SparseND yet!') - def __getstate__(self): raise Exception('Not implemented for SparseND yet!') @@ -169,7 +136,7 @@ def __setstate__(self, state): def __setitem__(self, key, value): """x.__setitem__(i, y) <=> x[i]=y - Set self[key] to value. + Set self[key] to value. Only slice [:] is supported. Parameters ---------- @@ -178,6 +145,26 @@ def __setitem__(self, key, value): value : NDArray or numpy.ndarray The value to set. + Examples + -------- + >>> src = mx.sparse_nd.row_sparse(data, indices, (3,3)) + >>> src.asnumpy() + array([[ 1., 0., 2.], + [ 0., 0., 0.], + [ 4., 5., 6.]], dtype=float32) + >>> # assign SparseNDArray with same storage type + >>> x = mx.sparse_nd.zeros('row_sparse', (3,3)) + >>> x[:] = src + >>> x.asnumpy() + array([[ 1., 0., 2.], + [ 0., 0., 0.], + [ 4., 5., 6.]], dtype=float32) + >>> # assign NDArray to SparseNDArray + >>> x[:] = mx.nd.ones((3,3)) + >>> x.asnumpy() + array([[ 1., 1., 1.], + [ 1., 1., 1.], + [ 1., 1., 1.]], dtype=float32) """ if not self.writable: raise ValueError('Failed to assign to a readonly NDArray') @@ -191,8 +178,7 @@ def __setitem__(self, key, value): elif isinstance(value, numeric_types): raise Exception("Assigning numeric types to SparseNDArray not supported yet.") elif isinstance(value, (np.ndarray, np.generic)): - # TODO(haibin) this is not efficient. Implement sync_copyfrom for - # sparse ndarray to avoid an extra copy + # TODO(haibin) Implement _sync_copyfrom for sparse ndarray to avoid an extra copy warnings.warn('Assigning non-NDArray object to SparseNDArray is not efficient', RuntimeWarning) tmp = ndarray.array(value) @@ -204,8 +190,27 @@ def __setitem__(self, key, value): raise Exception('SparseNDArray only supports [:] for assignment') def __getitem__(self, key): + """x.__getitem__(i) <=> x[i] + + Returns a sliced view of this array. + + Parameters + ---------- + key : int or slice + Indexing key. + + Examples + -------- + >>> x[:] = mx.nd.arange(0,6).reshape((2,3)) + >>> x.asnumpy() + array([[ 0., 1., 2.], + [ 3., 4., 5.]], dtype=float32) + >>> x[1:2].asnumpy() + array([[ 3., 4., 5.]], dtype=float32) + """ stype = self.storage_type - assert(stype == 'csr'), "getitem for " + str(stype) + " not implemented yet" + if stype != 'csr': + raise Exception("__getitem__ for " + str(stype) + " not implemented yet") if isinstance(key, int): raise Exception("Not implemented yet") if isinstance(key, py_slice): @@ -222,13 +227,8 @@ def _sync_copyfrom(self, source_array): raise Exception('Not implemented for SparseND yet!') def _slice(self, start, stop): - """Returns a read-only sliced SparseNDArray that shares memory with current one. - For csr SparseNDArray, it only slices the indptr array, and keeps the original values - and indices. - - The existing slice operation is not very efficient when it's copied, since the indices - and values are a superset of the sliced region. - + """Returns a read-only SparseNDArray slice that shares memory with current one. + To create a writable slice, please use ``mx.nd.slice`` instead. Parameters ---------- @@ -251,25 +251,20 @@ def _slice(self, start, stop): >>> a[1:2].asnumpy() array([[0, 0, 3]]) - >>> a[1:2]._indptr.asnumpy() - array([[2, 3]]) - - >>> a[1:2]._indicies.asnumpy() - array([0, 2, 2, 0, 1, 2]) - - >>> a[1:2]._values.asnumpy() - array([1, 2, 3, 4, 5, 6]) - """ stype = self.storage_type assert(stype == 'csr'), "_slice for " + str(stype) + " not implemented yet" warnings.warn('slicing SparseNDArray is not efficient', RuntimeWarning) - handle = NDArrayHandle() + shape = list(self.shape) + shape[0] = stop - start + handle = _new_alloc_handle(self.storage_type, tuple(shape), self.context, + True, self.dtype, self.aux_types) start = mx_uint(start) if start else mx_uint(0) stop = mx_uint(stop) if stop else mx_uint(self.shape[0]) - check_call(_LIB.MXNDArraySlice( - self.handle, start, stop, ctypes.byref(handle))) - return SparseNDArray(handle=handle, writable=False) + + check_call(_LIB.MXNDArraySliceEx(self.handle, start, stop, handle)) + ret = SparseNDArray(handle=handle, writable=False) + return ret def _at(self, idx): raise Exception('at operator for SparseND is not supported.') @@ -302,7 +297,7 @@ def _values(self): NDArray This SparseNDArray's values array. """ - return self._data(0) + return self._data() @property def _indices(self): @@ -350,8 +345,8 @@ def T(self): @property def aux_types(self): - ''' The data types of the aux data for the SparseNDArray. - ''' + """The data types of the aux data for the SparseNDArray. + """ aux_types = [] num_aux = self._num_aux for i in range(num_aux): @@ -387,7 +382,7 @@ def copyto(self, other): NDArray The copied array. If ``other`` is an ``NDArray``, then the return value and ``other`` will point to the same ``NDArray``. - """ + """ if isinstance(other, NDArray): if other.handle is self.handle: warnings.warn('You are attempting to copy an array to itself', RuntimeWarning) @@ -404,9 +399,7 @@ def to_dense(self): return to_dense(self) def _aux_data(self, i, writable=False): - """ Get a reference to the i-th aux data associated with the SparseNDArray. If the - SparseNDArray is not yet compacted, the returned result may include invalid values. - + """ Get an NDArray referencing the ith aux data array associated with the SparseNDArray. """ self.wait_to_read() hdl = NDArrayHandle() @@ -414,19 +407,13 @@ def _aux_data(self, i, writable=False): return NDArray(hdl, writable) def _data(self, writable=False): - """ Get a reference to the data associated with the SparseNDArray. If the - SparseNDArray is not yet compacted, the returned result may include invalid values. - + """ Get an NDArray referencing the value array associated with the SparseNDArray. """ self.wait_to_read() hdl = NDArrayHandle() check_call(_LIB.MXNDArrayGetDataNDArray(self.handle, ctypes.byref(hdl))) return NDArray(hdl, writable) - - def compact(self): - raise Exception("Not implemented yet") - def _prepare_src_array(src, dtype, default_dtype): if isinstance(src, NDArray): dtype = src.dtype if dtype is None else dtype @@ -510,7 +497,7 @@ def row_sparse(values, indices, shape, ctx=None, dtype=None, indices_type=None): A SparseNDArray with `row_sparse` storage is typically used to represent a subset of a larger NDArray with `default_storage` of shape [LARGE0, D1, .. , DN] where LARGE0 >> D0. The values in indices are the indices in the first dimension of the slices that have been extracted from - the larger NDArray. + the larger NDArray. The indices are expected to be sorted in ascending order. The corresponding NDArray ``dense`` with `default_storage` represented by a ``rsp`` SparseNDArray with `row_sparse` storage has @@ -594,10 +581,15 @@ def zeros(storage_type, shape, ctx=None, dtype=None, aux_types=None): ------- SparseNDArray A created array + Examples + -------- + >>> mx.sparse_nd.zeros('csr', (1,2), mx.gpu(0)) + + >>> mx.sparse_nd.zeros('row_sparse', (1,2), mx.gpu(0), 'float16').asnumpy() + array([[ 0., 0.]], dtype=float16) """ if ctx is None: ctx = Context.default_ctx - dtype = mx_real_t if dtype is None else dtype if aux_types is None: if storage_type == 'row_sparse' or storage_type == 'csr': @@ -608,10 +600,10 @@ def zeros(storage_type, shape, ctx=None, dtype=None, aux_types=None): out = SparseNDArray(_new_alloc_handle(storage_type, shape, ctx, True, dtype, aux_types)) return _internal._zeros(shape=shape, ctx=ctx, dtype=dtype, out=out) -_STORAGE_TYPE_TO_ND_CLASS = { - _STORAGE_TYPE_STR_TO_ID['default_storage']: ndarray.NDArray, - _STORAGE_TYPE_STR_TO_ID['row_sparse']: SparseNDArray, - _STORAGE_TYPE_STR_TO_ID['csr']: SparseNDArray, -} +def _ndarray_cls(handle): + stype = _storage_type(handle) + # TODO(haibin) in the long run, we want to have CSRNDArray and RowSparseNDArray which + # inherit from SparseNDArray + return NDArray(handle) if stype == 'default_storage' else SparseNDArray(handle) -_init_ndarray_module(_STORAGE_TYPE_TO_ND_CLASS, "mxnet") +_init_ndarray_module(_ndarray_cls, "mxnet") diff --git a/python/mxnet/symbol.py b/python/mxnet/symbol.py index 0ac326636314..d83654b25819 100644 --- a/python/mxnet/symbol.py +++ b/python/mxnet/symbol.py @@ -19,7 +19,7 @@ from .context import Context, cpu from .ndarray import _STORAGE_TYPE_ID_TO_STR, _STORAGE_TYPE_STR_TO_ID from .ndarray import NDArray, _DTYPE_NP_TO_MX, _DTYPE_MX_TO_NP -from .sparse_ndarray import SparseNDArray +from .sparse_ndarray import _ndarray_cls from .executor import Executor from . import _symbol_internal as _internal from .attribute import AttrScope @@ -1440,23 +1440,14 @@ def simple_bind(self, ctx, grad_req='write', type_dict=None, storage_type_dict=N shared_buffer[k] = v # create in_args, arg_grads, and aux_states for the current executor - arg_arrays = [NDArray(NDArrayHandle(in_arg_handles[i])) for i in range(num_in_args.value)] - grad_arrays = [NDArray(NDArrayHandle(arg_grad_handles[i])) + arg_arrays = [_ndarray_cls(NDArrayHandle(in_arg_handles[i])) \ + for i in range(num_in_args.value)] + grad_arrays = [_ndarray_cls(NDArrayHandle(arg_grad_handles[i])) if arg_grad_handles[i] is not None else None for i in range(num_in_args.value)] - aux_arrays = [NDArray(NDArrayHandle(aux_state_handles[i])) + aux_arrays = [_ndarray_cls(NDArrayHandle(aux_state_handles[i])) for i in range(num_aux_states.value)] - # redefine NDArray class based on storage types - def check_storage_type(ndarrays): - for idx, array in enumerate(ndarrays): - if array is not None and array.storage_type != 'default_storage': - ndarrays[idx].__class__ = SparseNDArray - return ndarrays - arg_arrays = check_storage_type(arg_arrays) - grad_arrays = check_storage_type(grad_arrays) - aux_arrays = check_storage_type(aux_arrays) - executor = Executor(exe_handle, self, ctx, grad_req, group2ctx) executor.arg_arrays = arg_arrays executor.grad_arrays = grad_arrays diff --git a/src/c_api/c_api.cc b/src/c_api/c_api.cc index 5dda17f2d1c1..458a23881bf6 100644 --- a/src/c_api/c_api.cc +++ b/src/c_api/c_api.cc @@ -313,6 +313,16 @@ int MXNDArraySlice(NDArrayHandle handle, API_END_HANDLE_ERROR(delete ptr); } +int MXNDArraySliceEx(NDArrayHandle handle, + mx_uint slice_begin, + mx_uint slice_end, + NDArrayHandle out) { + NDArray *ptr = static_cast(out); + API_BEGIN(); + static_cast(handle)->SliceEx(slice_begin, slice_end, ptr); + API_END(); +} + int MXNDArrayAt(NDArrayHandle handle, mx_uint idx, NDArrayHandle *out) { @@ -430,7 +440,7 @@ int MXNDArrayGetAuxNDArray(NDArrayHandle handle, NDArrayHandle *out) { API_BEGIN(); NDArray *arr = static_cast(handle); - *out = new NDArray(arr->AuxNDArray(i)); + *out = new NDArray(arr->aux_ndarray(i)); API_END(); } @@ -438,7 +448,7 @@ int MXNDArrayGetDataNDArray(NDArrayHandle handle, NDArrayHandle *out) { API_BEGIN(); NDArray *arr = static_cast(handle); - *out = new NDArray(arr->DataNDArray()); + *out = new NDArray(arr->data_ndarray()); API_END(); } diff --git a/src/ndarray/ndarray.cc b/src/ndarray/ndarray.cc index b292ecf93f21..f692a5700ba5 100644 --- a/src/ndarray/ndarray.cc +++ b/src/ndarray/ndarray.cc @@ -11,6 +11,7 @@ #include #include #include "./ndarray_function.h" +#include "../operator/tensor/matrix_op-inl.h" #include "./autograd.h" #if MXNET_USE_OPENCV @@ -53,23 +54,15 @@ NDArray NDArray::Reshape(const TShape &shape) const { NDArray NDArray::Slice(index_t begin, index_t end) const { using namespace autograd; + using namespace mshadow; NDArray ret = *this; CHECK(!is_none()) << "NDArray is not initialized"; CHECK_GE(shape_[0], end) << "Slice end index out of range"; auto stype = storage_type(); - if (stype == kDefaultStorage) { - size_t length = shape_.ProdShape(1, shape_.ndim()); - ret.offset_ += begin * length; - ret.shape_[0] = end - begin; - } else if (stype == kCSRStorage) { - // for csr, the offset variable is used to adjust indptr - // while getting aux_data, the dptr of indptr is advanced by offset, - // and shape for indptr is end - begin + 1 - ret.offset_ += begin; - ret.shape_[0] = end - begin; - } else { - LOG(FATAL) << "Slice not yet implemented for storage " << stype; - } + CHECK_EQ(stype, kDefaultStorage); + size_t length = shape_.ProdShape(1, shape_.ndim()); + ret.offset_ += begin * length; + ret.shape_[0] = end - begin; if (AutogradRuntime::Get()->IsTraining()) { // fake a slice_axis op ret.entry_.clear(); @@ -91,6 +84,66 @@ NDArray NDArray::Slice(index_t begin, index_t end) const { } } +void NDArray::SliceEx(index_t begin, index_t end, NDArray *ret) const { + using namespace autograd; + using namespace mshadow; + CHECK(!is_none()) << "NDArray is not initialized"; + CHECK_GE(shape_[0], end) << "Slice end index out of range"; + auto stype = storage_type(); + CHECK_NE(stype, kDefaultStorage); + if (stype == kCSRStorage) { + using namespace csr; + ret->shape_[0] = end - begin; + NDArray src = *this; + // destination NDArray shares the same variable + ret->ptr_->var = var(); + Engine::Get()->PushSync([src, ret, begin, end](RunContext ctx) { + NDArray dst = *ret; + // create a new chunk for dst NDArray + NDArray::Chunk chunk = *src.ptr_; + // void indptr storage handle + chunk.aux_handles[kIndPtr] = Storage::Handle(); + // shape for indptr is end - begin + 1 + chunk.CheckAndAllocAuxData(kIndPtr, Shape1(end - begin + 1)); + if (src.ctx().dev_mask() == cpu::kDevMask) { + MSHADOW_INT_TYPE_SWITCH(src.aux_type(kIndPtr), IType, { + MSHADOW_TYPE_SWITCH(src.dtype(), DType, { + // create new indptr + const IType* src_indptr = src.aux_data(kIndPtr).dptr(); + IType* dst_indptr = static_cast (chunk.aux_handles[kIndPtr].dptr); + op::SliceCsrIndPtrImpl(begin, end, ctx, src_indptr, dst_indptr); + // advance idx and values pointers (CPU implementation) + // TODO(haibin) refactor for GPU implementation later + IType offset = src_indptr[begin]; + IType* idx = static_cast(chunk.aux_handles[kIdx].dptr); + DType* values = static_cast(chunk.shandle.dptr); + chunk.aux_handles[kIdx].dptr = idx + offset; + chunk.shandle.dptr = values + offset; + // update storage shape and aux shape (CPU implementation) + auto nnz = dst_indptr[end - begin]; + chunk.aux_shapes[kIdx] = Shape1(nnz); + chunk.storage_shape = Shape1(nnz); + chunk.static_data = true; + chunk.skip_delete_var = true; + // update dst chunk + *dst.ptr_ = chunk; + }); + }); + } else { +#if MXNET_USE_CUDA + LOG(FATAL) << "SliceEx CSR not implemented yet"; +#else + LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR; +#endif + } + }, ctx(), {}, {var()}, + FnProperty::kNormal, 0, PROFILER_MESSAGE_FUNCNAME); + } else { + LOG(FATAL) << "Slice not yet implemented for storage " << stype; + } + // TODO(haibin) support auto_grad for SliceEx +} + NDArray NDArray::At(index_t idx) const { CHECK(storage_type() == kDefaultStorage) << "Storage type " << storage_type() << " doesn't support At()"; diff --git a/src/operator/operator_common.h b/src/operator/operator_common.h index e8484346cc16..6e0bc2ad5ba6 100755 --- a/src/operator/operator_common.h +++ b/src/operator/operator_common.h @@ -324,7 +324,8 @@ void FCompExFallback(const nnvm::NodeAttrs& attrs, const std::vector& inputs, const std::vector& req, const std::vector& outputs, - FCompute fcompute) { + FCompute fcompute, + const std::string& fname) { std::vector in_blobs, out_blobs; std::vector tmps; common::GetInputBlobs(inputs, &in_blobs, &tmps, ctx, true); diff --git a/src/operator/optimizer_op-inl.h b/src/operator/optimizer_op-inl.h index 358511aa0592..7872826999c0 100755 --- a/src/operator/optimizer_op-inl.h +++ b/src/operator/optimizer_op-inl.h @@ -87,7 +87,7 @@ inline void SGDUpdate(const nnvm::NodeAttrs& attrs, /*! \brief kernel for sparse sgd */ template -struct SparseSGDDnsRspKernel { +struct SGDDnsRspKernel { // DType is the output data type // IType is row sparse idx type // i is the ith row in row sparse gradient @@ -110,9 +110,8 @@ struct SparseSGDDnsRspKernel { } }; -// Impl implies a different interface than FComputeEx template -inline void SparseSGDUpdateDnsRspImpl(const SGDParam& param, +inline void SGDUpdateDnsRspImpl(const SGDParam& param, const OpContext &ctx, const std::vector &inputs, const std::vector &req, @@ -137,7 +136,7 @@ inline void SparseSGDUpdateDnsRspImpl(const SGDParam& param, auto out_data = out.data().FlatTo2D(s); auto num_rows = grad.aux_shape(rowsparse::kIdx)[0]; auto width = weight.shape().ProdShape(1, weight.shape().ndim()); - mxnet_op::Kernel, xpu>::Launch(s, num_rows, width, + mxnet_op::Kernel, xpu>::Launch(s, num_rows, width, out_data.dptr_, weight_data.dptr_, grad_idx.dptr_, grad_val.dptr_, static_cast(param.clip_gradient), static_cast(param.lr), static_cast(param.wd), @@ -148,7 +147,7 @@ inline void SparseSGDUpdateDnsRspImpl(const SGDParam& param, } template -inline void SparseSGDUpdateEx(const nnvm::NodeAttrs& attrs, +inline void SGDUpdateEx(const nnvm::NodeAttrs& attrs, const OpContext &ctx, const std::vector &inputs, const std::vector &req, @@ -160,9 +159,9 @@ inline void SparseSGDUpdateEx(const nnvm::NodeAttrs& attrs, auto weight_stype = inputs[0].storage_type(); auto grad_stype = inputs[1].storage_type(); if (weight_stype == kDefaultStorage && grad_stype == kRowSparseStorage) { - SparseSGDUpdateDnsRspImpl(param, ctx, inputs, req, outputs); - } else { - LOG(FATAL) << "Not implemented"; + SGDUpdateDnsRspImpl(param, ctx, inputs, req, outputs); + } else if (weight_stype == kDefaultStorage && grad_stype == kDefaultStorage) { + FCompExFallback(attrs, ctx, inputs, req, outputs, SGDUpdate, "SGDUpdate"); } } @@ -236,7 +235,7 @@ inline void SGDMomUpdate(const nnvm::NodeAttrs& attrs, } template -struct SparseSGDMomDnsRspDnsKernel { +struct SGDMomDnsRspDnsKernel { template MSHADOW_XINLINE static void Map(int i, size_t width, DType* out_data, DType* mom_data, const DType* weight_data, const IType* grad_idx, @@ -262,7 +261,7 @@ struct SparseSGDMomDnsRspDnsKernel { }; template -inline void SparseSGDMomUpdateDnsRspDnsImpl(const SGDMomParam& param, +inline void SGDMomUpdateDnsRspDnsImpl(const SGDMomParam& param, const OpContext &ctx, const std::vector &inputs, const std::vector &req, @@ -285,7 +284,7 @@ inline void SparseSGDMomUpdateDnsRspDnsImpl(const SGDMomParam& param, auto out_data = out.data().FlatTo2D(s); auto num_rows = grad.aux_shape(rowsparse::kIdx)[0]; auto width = weight.shape().ProdShape(1, weight.shape().ndim()); - Kernel, xpu>::Launch(s, num_rows, width, + Kernel, xpu>::Launch(s, num_rows, width, out_data.dptr_, mom_data.dptr_, weight_data.dptr_, grad_idx.dptr_, grad_val.dptr_, static_cast(param.clip_gradient), static_cast(param.momentum), static_cast(param.lr), static_cast(param.wd), @@ -296,11 +295,11 @@ inline void SparseSGDMomUpdateDnsRspDnsImpl(const SGDMomParam& param, } template -inline void SparseSGDMomUpdateEx(const nnvm::NodeAttrs& attrs, - const OpContext &ctx, - const std::vector &inputs, - const std::vector &req, - const std::vector &outputs) { +inline void SGDMomUpdateEx(const nnvm::NodeAttrs& attrs, + const OpContext &ctx, + const std::vector &inputs, + const std::vector &req, + const std::vector &outputs) { using namespace mxnet_op; const SGDMomParam& param = nnvm::get(attrs.parsed); auto weight_stype = inputs[0].storage_type(); @@ -309,9 +308,11 @@ inline void SparseSGDMomUpdateEx(const nnvm::NodeAttrs& attrs, if (weight_stype == kDefaultStorage && grad_stype == kRowSparseStorage && mom_stype == kDefaultStorage) { - SparseSGDMomUpdateDnsRspDnsImpl(param, ctx, inputs, req, outputs); - } else { - LOG(FATAL) << "Not implemented"; + SGDMomUpdateDnsRspDnsImpl(param, ctx, inputs, req, outputs); + } else if (weight_stype == kDefaultStorage && grad_stype == kDefaultStorage && + mom_stype == kDefaultStorage) { + FCompExFallback(attrs, ctx, inputs, req, outputs, + SGDMomUpdate, "SGDMomUpdate"); } } diff --git a/src/operator/optimizer_op.cc b/src/operator/optimizer_op.cc index 0429f4de797d..5464d03b215f 100644 --- a/src/operator/optimizer_op.cc +++ b/src/operator/optimizer_op.cc @@ -22,23 +22,8 @@ It updates the weights using:: weight = weight - learning_rate * gradient -)code" ADD_FILELINE) -.set_num_inputs(2) -.set_num_outputs(1) -.set_attr_parser(ParamParser) -.set_attr("FInferShape", ElemwiseShape<2, 1>) -.set_attr("FInferType", ElemwiseType<2, 1>) -.set_attr("FCompute", SGDUpdate) -.add_argument("weight", "NDArray-or-Symbol", "Weight") -.add_argument("grad", "NDArray-or-Symbol", "Gradient") -.add_arguments(SGDParam::__FIELDS__()); - -NNVM_REGISTER_OP(sparse_sgd_update) -.describe(R"code(Update function for Stochastic Gradient Descent (SDG) optimizer. - -It updates the weights using:: - - weight = weight - learning_rate * gradient for non-zero rows +If gradients are stored with `row_sparse` storage, +where update is applied only to rows whose gradient has non-zero entries. )code" ADD_FILELINE) .set_num_inputs(2) @@ -46,9 +31,8 @@ It updates the weights using:: .set_attr_parser(ParamParser) .set_attr("FInferShape", ElemwiseShape<2, 1>) .set_attr("FInferType", ElemwiseType<2, 1>) -// TODO(haibin) implement FCompute for sparse sgd -// .set_attr("FCompute", SGDUpdate) -.set_attr(FCOMP_EX_CPU, SparseSGDUpdateEx) +.set_attr("FCompute", SGDUpdate) +.set_attr(FCOMP_EX_CPU, SGDUpdateEx) .add_argument("weight", "NDArray-or-Symbol", "Weight") .add_argument("grad", "NDArray-or-Symbol", "Gradient") .add_arguments(SGDParam::__FIELDS__()); @@ -72,24 +56,9 @@ It updates the weights using:: Where the parameter ``momentum`` is the decay rate of momentum estimates at each epoch. -)code" ADD_FILELINE) -.set_num_inputs(3) -.set_num_outputs(1) -.set_attr_parser(ParamParser) -.set_attr("FInferShape", ElemwiseShape<3, 1>) -.set_attr("FInferType", ElemwiseType<3, 1>) -.set_attr("FMutateInputs", - [](const nnvm::NodeAttrs& attrs) { - return std::vector{2}; - }) -.set_attr("FCompute", SGDMomUpdate) -.add_argument("weight", "NDArray-or-Symbol", "Weight") -.add_argument("grad", "NDArray-or-Symbol", "Gradient") -.add_argument("mom", "NDArray-or-Symbol", "Momentum") -.add_arguments(SGDMomParam::__FIELDS__()); +If gradients are stored with `row_sparse` storage, +only rows whose gradients contain non-zero entries are updated (for both weight and momentum). -NNVM_REGISTER_OP(sparse_sgd_mom_update) -.describe(R"code(Momentum update function for SGD for non-zero gradients )code" ADD_FILELINE) .set_num_inputs(3) .set_num_outputs(1) @@ -100,9 +69,8 @@ NNVM_REGISTER_OP(sparse_sgd_mom_update) [](const nnvm::NodeAttrs& attrs) { return std::vector{2}; }) -// TODO(haibin) implement FCompute -// .set_attr("FCompute", SGDMomUpdate) -.set_attr(FCOMP_EX_CPU, SparseSGDMomUpdateEx) +.set_attr("FCompute", SGDMomUpdate) +.set_attr(FCOMP_EX_CPU, SGDMomUpdateEx) .add_argument("weight", "NDArray-or-Symbol", "Weight") .add_argument("grad", "NDArray-or-Symbol", "Gradient") .add_argument("mom", "NDArray-or-Symbol", "Momentum") diff --git a/src/operator/optimizer_op.cu b/src/operator/optimizer_op.cu index 188f42c44c79..bf0cc570e1f4 100644 --- a/src/operator/optimizer_op.cu +++ b/src/operator/optimizer_op.cu @@ -10,16 +10,12 @@ namespace mxnet { namespace op { NNVM_REGISTER_OP(sgd_update) -.set_attr("FCompute", SGDUpdate); +.set_attr("FCompute", SGDUpdate) +.set_attr(FCOMP_EX_GPU, SGDUpdateEx); NNVM_REGISTER_OP(sgd_mom_update) -.set_attr("FCompute", SGDMomUpdate); - -NNVM_REGISTER_OP(sparse_sgd_update) -.set_attr(FCOMP_EX_GPU, SparseSGDUpdateEx); - -NNVM_REGISTER_OP(sparse_sgd_mom_update) -.set_attr(FCOMP_EX_GPU, SparseSGDMomUpdateEx); +.set_attr("FCompute", SGDMomUpdate) +.set_attr(FCOMP_EX_GPU, SGDMomUpdateEx); NNVM_REGISTER_OP(adam_update) .set_attr("FCompute", AdamUpdate); diff --git a/src/operator/tensor/elemwise_binary_op.h b/src/operator/tensor/elemwise_binary_op.h index 024b2fec90a7..70802a78346f 100644 --- a/src/operator/tensor/elemwise_binary_op.h +++ b/src/operator/tensor/elemwise_binary_op.h @@ -217,8 +217,10 @@ void BinaryComputeEx(const nnvm::NodeAttrs& attrs, CHECK_EQ(outputs.size(), 1); if (typeid(OP) == typeid(mshadow::op::plus)) { // If any input is dense, fallback to FCompute + // TODO(haibin) implement dns + rsp in a separate kernel if (common::ContainsDefaultStorage(inputs)) { - FCompExFallback(attrs, ctx, inputs, req, outputs, BinaryCompute); + FCompExFallback(attrs, ctx, inputs, req, outputs, + BinaryCompute, "BinaryCompute"); return; } CHECK_EQ(inputs[0].storage_type(), kRowSparseStorage) << "Sparse type not supported yet"; diff --git a/src/operator/tensor/matrix_op-inl.h b/src/operator/tensor/matrix_op-inl.h index 39ff218fdef8..cbf45ac04cea 100644 --- a/src/operator/tensor/matrix_op-inl.h +++ b/src/operator/tensor/matrix_op-inl.h @@ -7,7 +7,6 @@ #define MXNET_OPERATOR_TENSOR_MATRIX_OP_INL_H_ #include -#include #include #include #include @@ -1004,6 +1003,20 @@ struct SliceCsrIndPtr { } }; +/* + * a wrapper to launch SliceCsrIndPtr kernel. + * slice [src[begin] .. src[end]) and store in dst[0, end - begin) + */ +template +void SliceCsrIndPtrImpl(const int begin, const int end, RunContext ctx, + const IType* src, IType* dst) { + using namespace mshadow; + using namespace mxnet_op; + Stream *s = ctx.get_stream(); + int indptr_len = end - begin + 1; + Kernel::Launch(s, indptr_len, dst, src + begin, src + begin); +} + /* * Slice a CSR NDArray * Only implemented for CPU @@ -1034,12 +1047,10 @@ void SliceCsrImpl(const SliceParam ¶m, const OpContext& ctx, MSHADOW_TYPE_SWITCH(in.dtype(), DType, { auto in_indptr = in.aux_data(kIndPtr).dptr(); auto out_indptr = out.aux_data(kIndPtr).dptr(); - int num_threads = omp_get_num_threads(); - int segment_len = (indptr_len + num_threads - 1) / num_threads; - Kernel::Launch(s, indptr_len, out_indptr, in_indptr + begin, - in_indptr + begin); + SliceCsrIndPtrImpl(begin, end, ctx.run_ctx, in_indptr, out_indptr); + // retrieve nnz (CPU implementation) - int nnz = out_indptr[indptr_len - 1] - out_indptr[0]; + int nnz = out_indptr[indptr_len - 1]; // copy indices and values out.CheckAndAllocAuxData(kIdx, Shape1(nnz)); out.CheckAndAllocData(Shape1(nnz)); diff --git a/tests/cpp/test_utils.h b/tests/cpp/test_utils.h index b3df0d8ad2fa..c528539a2cb7 100644 --- a/tests/cpp/test_utils.h +++ b/tests/cpp/test_utils.h @@ -74,8 +74,8 @@ NDArray RspND(const TShape shape, const Context ctx, const std::vector y + assert (z.asnumpy() == np.zeros(shape)).all() + z = y > 0 + assert (z.asnumpy() == np.ones(shape)).all() + z = 0 > y + assert (z.asnumpy() == np.zeros(shape)).all() + +def test_sparse_nd_greater_equal(): + stype = 'csr' + shape = rand_shape_2d() + x = mx.sparse_nd.zeros(stype, shape) + y = sparse_nd_ones(shape, stype) + z = x >= y + assert (z.asnumpy() == np.zeros(shape)).all() + z = y >= 0 + assert (z.asnumpy() == np.ones(shape)).all() + z = 0 >= y + assert (z.asnumpy() == np.zeros(shape)).all() + z = y >= 1 + assert (z.asnumpy() == np.ones(shape)).all() + +def test_sparse_nd_lesser(): + stype = 'csr' + shape = rand_shape_2d() + x = mx.sparse_nd.zeros(stype, shape) + y = sparse_nd_ones(shape, stype) + z = y < x + assert (z.asnumpy() == np.zeros(shape)).all() + z = 0 < y + assert (z.asnumpy() == np.ones(shape)).all() + z = y < 0 + assert (z.asnumpy() == np.zeros(shape)).all() + +def test_sparse_nd_lesser_equal(): + stype = 'csr' + shape = rand_shape_2d() + x = mx.sparse_nd.zeros(stype, shape) + y = sparse_nd_ones(shape, stype) + z = y <= x + assert (z.asnumpy() == np.zeros(shape)).all() + z = 0 <= y + assert (z.asnumpy() == np.ones(shape)).all() + z = y <= 0 + assert (z.asnumpy() == np.zeros(shape)).all() + z = 1 <= y + assert (z.asnumpy() == np.ones(shape)).all() + +def test_sparse_nd_binary(): + N = 100 + def check_binary(fn): + for _ in range(N): + ndim = 2 + oshape = np.random.randint(1, 6, size=(ndim,)) + bdim = 2 + lshape = list(oshape) + rshape = list(oshape[ndim-bdim:]) + for i in range(bdim): + sep = np.random.uniform(0, 1) + if sep < 0.33: + lshape[ndim-i-1] = 1 + elif sep < 0.66: + rshape[bdim-i-1] = 1 + lhs = np.random.normal(0, 1, size=lshape) + rhs = np.random.normal(0, 1, size=rshape) + lhs_nd = mx.nd.array(lhs).to_csr() + rhs_nd = mx.nd.array(rhs).to_csr() + assert_allclose(fn(lhs, rhs), + fn(lhs_nd, rhs_nd).asnumpy(), + rtol=1e-4, atol=1e-4) + + #check_binary(lambda x, y: x + y) + check_binary(lambda x, y: x - y) + check_binary(lambda x, y: x * y) + check_binary(lambda x, y: x / y) + check_binary(lambda x, y: x > y) + check_binary(lambda x, y: x < y) + check_binary(lambda x, y: x >= y) + check_binary(lambda x, y: x <= y) + check_binary(lambda x, y: x == y) + +def test_sparse_nd_negate(): + npy = np.random.uniform(-10, 10, rand_shape_2d()) + arr = mx.nd.array(npy).to_csr() + assert_almost_equal(npy, arr.asnumpy()) + assert_almost_equal(-npy, (-arr).asnumpy()) + + # a final check to make sure the negation (-) is not implemented + # as inplace operation, so the contents of arr does not change after + # we compute (-arr) + assert_almost_equal(npy, arr.asnumpy()) + if __name__ == '__main__': - test_sparse_nd_zeros() - test_sparse_nd_elemwise_add() - test_sparse_nd_elementwise_fallback() - test_sparse_nd_copy() - test_sparse_nd_setitem() - test_sparse_nd_basic() - test_sparse_nd_slice() + import nose + nose.runmodule() diff --git a/tests/python/unittest/test_sparse_operator.py b/tests/python/unittest/test_sparse_operator.py index ea6f835a60aa..978737028c98 100644 --- a/tests/python/unittest/test_sparse_operator.py +++ b/tests/python/unittest/test_sparse_operator.py @@ -5,7 +5,6 @@ from numpy.testing import assert_allclose from mxnet.test_utils import * - def check_elemwise_add_ex(lhs_stype, rhs_stype, shape, lhs_grad_stype=None, rhs_grad_stype=None): lhs = mx.symbol.Variable('lhs', storage_type=lhs_stype) rhs = mx.symbol.Variable('rhs', storage_type=rhs_stype) @@ -30,10 +29,11 @@ def check_elemwise_add_ex(lhs_stype, rhs_stype, shape, lhs_grad_stype=None, rhs_ def test_elemwise_add_ex(): shape = (rnd.randint(1, 10), rnd.randint(1, 10)) check_elemwise_add_ex('default_storage', 'default_storage', shape) - check_elemwise_add_ex('default_storage', 'row_sparse', shape) - check_elemwise_add_ex('row_sparse', 'default_storage', shape) - check_elemwise_add_ex('row_sparse', 'row_sparse', shape, - lhs_grad_stype='row_sparse', rhs_grad_stype='row_sparse') + # TODO(haibin/jun) enable these tests when Dns -> Rsp (compact) is implemented. + #check_elemwise_add_ex('default_storage', 'row_sparse', shape) + #check_elemwise_add_ex('row_sparse', 'default_storage', shape) + #check_elemwise_add_ex('row_sparse', 'row_sparse', shape, + # lhs_grad_stype='row_sparse', rhs_grad_stype='row_sparse') # TODO(haibin) randomize this test @@ -62,7 +62,7 @@ def test_elemwise_add_ex_multiple_stages(): check_symbolic_forward(test, {'sp_data1': sp_nd1, 'sp_data2': sp_nd2, 'ds_data': ds_nd}, [sp_np1 + sp_np2 + ds_np]) - arr_grads = [mx.nd.zeros(shape) for i in xrange(3)] + arr_grads = [mx.nd.zeros(shape) for i in range(3)] exec_test = test.bind(default_context(), args={'sp_data1': sp_nd1, 'sp_data2': sp_nd2, 'ds_data': ds_nd}, args_grad=arr_grads) exec_test.forward(is_train=True) @@ -70,7 +70,6 @@ def test_elemwise_add_ex_multiple_stages(): exec_test.backward(out_grads=exec_test.outputs) assert_almost_equal(arr_grads[0].asnumpy(), arr_grads[1].asnumpy()) - # TODO(haibin) also add test for backward pass def test_cast_storage_ex(): def test_rsp_to_dns(shape): @@ -176,21 +175,20 @@ def test_sparse_embedding(): assert_almost_equal(grad_map["embed_weight"].asnumpy(), np.dot(np_onehot.T, np_grad), atol=1e-5) def test_sparse_slice(): - def check_csr_slice(shape, sliced_input): + def check_csr_slice(shape, slice_input): storage_type = 'csr' A, _ = rand_sparse_ndarray(shape, storage_type) - A = A._slice(1, shape[0] - 1) if sliced_input else A - A2 = A.asnumpy() - begin = rnd.randint(0, A.shape[0] - 1) - end = rnd.randint(begin + 1, A.shape[0]) - A_slice = mx.nd.crop(A, begin=begin, end=end) - assert same(A_slice.asnumpy(), A2[begin:end]), (A_slice.asnumpy(), A2[begin:end]) + B = A._slice(1, shape[0] - 1) if slice_input else A + np = B.asnumpy() + begin = rnd.randint(0, B.shape[0] - 1) + end = rnd.randint(begin + 1, B.shape[0]) + nd_slice = mx.nd.crop(B, begin=begin, end=end) + assert same(nd_slice.asnumpy(), np[begin:end]), (nd_slice.asnumpy(), np[begin:end]) shape = (rnd.randint(7, 15), rnd.randint(1, 10)) check_csr_slice(shape, True) check_csr_slice(shape, False) - if __name__ == '__main__': test_elemwise_add_ex() test_elemwise_add_ex_multiple_stages()