Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

add stream wait to all pushes, change kvstore to redef pinned memory #62

Merged
merged 1 commit into from
Sep 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/mxnet/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#define MXNET_USE_CUDNN MSHADOW_USE_CUDNN
#endif

/*! \brief Error message for using gpu when MXNET_USE_CUDA==0 */
#define MXNET_GPU_NOT_ENABLED_ERROR "GPU is not enabled"

/*! \brief namespace of mxnet */
namespace mxnet {
/*! \brief mxnet cpu */
Expand All @@ -50,7 +53,6 @@ typedef mshadow::TShape TShape;
typedef mshadow::TBlob TBlob;
} // namespace mxnet


//! \cond Doxygen_Suppress
namespace dmlc {
// Add a few patches to support TShape in dmlc/parameter.
Expand Down
9 changes: 9 additions & 0 deletions include/mxnet/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ struct RunContext {
* \brief the stream of the device, can be NULL or Stream<gpu>* in GPU mode
*/
void *stream;
/*!
* \brief get mshadow stream from Context
* \return the mshadow stream
* \tparam xpu the device type of the stream
*/
template<typename xpu>
inline mshadow::Stream<xpu>* get_stream() const {
return static_cast<mshadow::Stream<xpu>*>(stream);
}
};

/*!
Expand Down
2 changes: 1 addition & 1 deletion include/mxnet/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct OpContext {
*/
template<typename xpu>
inline mshadow::Stream<xpu>* get_stream() const {
return static_cast<mshadow::Stream<xpu>*>(run_ctx.stream);
return run_ctx.get_stream<xpu>();
}
};

Expand Down
14 changes: 7 additions & 7 deletions src/kvstore/kvstore_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ namespace mxnet {
*/
class KVStoreLocal : public KVStore {
public:
KVStoreLocal() : pinned_ctx_(cpu::kDevMask, Context::kPinnedMemoryID) {
KVStoreLocal() {
#if MXNET_USE_CUDA
pinned_ctx_ = Context(cpu::kDevMask, Context::kPinnedMemoryID);
#else
pinned_ctx_ = Context(cpu::kDevMask, 0);
#endif
Clear();
}

Expand All @@ -44,11 +49,7 @@ class KVStoreLocal : public KVStore {
for (size_t i = 0; i < keys.size(); ++i) {
CHECK(local_.find(keys[i]) == local_.end())
<< "duplicate init of key " << keys[i];
#if MXNET_USE_CUDA
local_.insert({keys[i], values[i].Copy(pinned_ctx_)});
#else
local_.insert({keys[i], values[i].Copy(local_ctx_)});
#endif // MXNET_USE_CUDA
}
}

Expand Down Expand Up @@ -121,7 +122,7 @@ class KVStoreLocal : public KVStore {
CHECK(val.size());
auto& buf = merge_buf_[key];
if (buf.merged.is_none()) {
buf.merged = val[0].Copy(local_ctx_);
buf.merged = val[0].Copy(pinned_ctx_);
} else {
CopyFromTo(val[0], &buf.merged);
}
Expand Down Expand Up @@ -167,7 +168,6 @@ class KVStoreLocal : public KVStore {
/// \brief local storage
std::unordered_map<int, NArray> local_;

Context local_ctx_;
Context pinned_ctx_;

Updater updater_;
Expand Down
187 changes: 88 additions & 99 deletions src/narray/narray.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
#include <dmlc/logging.h>
#include <dmlc/registry.h>
#include <mxnet/base.h>
#include <mxnet/narray.h>
#include <mshadow/tensor.h>
#include "./narray_function.h"
Expand Down Expand Up @@ -42,45 +43,34 @@ inline void BinaryOp(const NArray &lhs,
}
// important: callback must always capture by value
NArray ret = *out;
// get the const variables
std::vector<Engine::VarHandle> const_vars;
if (lhs.ptr_->var != ret.ptr_->var) const_vars.push_back(lhs.ptr_->var);
if (rhs.ptr_->var != ret.ptr_->var) const_vars.push_back(rhs.ptr_->var);

// redirect everything to mshadow operations
switch (lhs.ctx().dev_mask) {
case cpu::kDevMask: {
auto func = [lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu, OP>(lhs.data(), rhs.data(), &tmp, ctx);
};
if (lhs.ptr_->var == ret.ptr_->var && rhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {}, {ret.ptr_->var});
} else if (lhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {rhs.ptr_->var}, {ret.ptr_->var});
} else if (rhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var}, {ret.ptr_->var});
} else {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var, rhs.ptr_->var}, {ret.ptr_->var});
}
Engine::Get()->Push([lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu, OP>(lhs.data(), rhs.data(), &tmp, ctx);
}, lhs.ctx(), const_vars, {ret.ptr_->var});
break;
}
#if MXNET_USE_CUDA
case gpu::kDevMask: {
auto func = [lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu, OP>(lhs.data(), rhs.data(), &tmp, ctx);
};
if (lhs.ptr_->var == ret.ptr_->var && rhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {}, {ret.ptr_->var});
} else if (lhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {rhs.ptr_->var}, {ret.ptr_->var});
} else if (rhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var}, {ret.ptr_->var});
} else {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var, rhs.ptr_->var}, {ret.ptr_->var});
}
Engine::Get()->Push([lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu, OP>(lhs.data(), rhs.data(), &tmp, ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, lhs.ctx(), const_vars, {ret.ptr_->var});
break;
}
#endif
default: LOG(FATAL) << "GPU is not enabled";
default: LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
}
}

Expand All @@ -90,26 +80,26 @@ inline void SetValueOp(const real_t &rhs, NArray *out) {
NArray ret = *out;
switch (ret.ctx().dev_mask) {
case cpu::kDevMask: {
auto func = [rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu>(rhs, &tmp, ctx);
};
Engine::Get()->Push(func, ret.ctx(), {}, {ret.ptr_->var});
Engine::Get()->Push([rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu>(rhs, &tmp, ctx);
}, ret.ctx(), {}, {ret.ptr_->var});
break;
}
#if MXNET_USE_CUDA
case gpu::kDevMask: {
auto func = [rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu>(rhs, &tmp, ctx);
};
Engine::Get()->Push(func, ret.ctx(), {}, {ret.ptr_->var});
Engine::Get()->Push([rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu>(rhs, &tmp, ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, ret.ctx(), {}, {ret.ptr_->var});
break;
}
#endif
default: LOG(FATAL) << "GPU is not enabled";
default: LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
}
}
/*!
Expand All @@ -124,45 +114,40 @@ inline void ScalarOp(const NArray &lhs,
const real_t &rhs,
NArray *out) {
if (out->is_none()) {
*out = NArray(OP::GetShape(lhs.shape(), lhs.shape()), lhs.ctx(), true);
*out = NArray(lhs.shape(), lhs.ctx(), true);
} else {
CHECK(out->ctx() == lhs.ctx()) << "target context mismatch";
CHECK(out->shape() == OP::GetShape(lhs.shape(), lhs.shape()))
<< "target shape mismatch";
CHECK(out->shape() == lhs.shape()) << "target shape mismatch";
}
// important: callback must always capture by value
NArray ret = *out;
// get the const variables
std::vector<Engine::VarHandle> const_vars;
if (lhs.ptr_->var != ret.ptr_->var) const_vars.push_back(lhs.ptr_->var);

// redirect everything to mshadow operations
switch (lhs.ctx().dev_mask) {
case cpu::kDevMask: {
auto func = [lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu, OP, reverse>(lhs.data(), rhs, &tmp, ctx);
};
if (lhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {}, {ret.ptr_->var});
} else {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var}, {ret.ptr_->var});
}
Engine::Get()->Push([lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu, OP, reverse>(lhs.data(), rhs, &tmp, ctx);
}, lhs.ctx(), const_vars, {ret.ptr_->var});
break;
}
#if MXNET_USE_CUDA
case gpu::kDevMask: {
auto func = [lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu, OP, reverse>(lhs.data(), rhs, &tmp, ctx);
};
if (lhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {}, {ret.ptr_->var});
} else {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var}, {ret.ptr_->var});
}
Engine::Get()->Push([lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu, OP, reverse>(lhs.data(), rhs, &tmp, ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, lhs.ctx(), const_vars, {ret.ptr_->var});
break;
}
#endif
default: LOG(FATAL) << "GPU is not enabled";
default: LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
}
}

Expand All @@ -175,48 +160,52 @@ void CopyFromTo(const NArray &from, NArray *to) {
NArray ret = *to;
int a = from.ctx().dev_mask;
int b = to->ctx().dev_mask;

std::vector<Engine::VarHandle> const_vars;
if (from.ptr_->var != ret.ptr_->var) const_vars.push_back(from.ptr_->var);

if (a == cpu::kDevMask && b == cpu::kDevMask) {
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<cpu, cpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
}, from.ctx(), {from.ptr_->var}, {ret.ptr_->var});
} else if (a == cpu::kDevMask && b == gpu::kDevMask) {
#if MXNET_USE_CUDA
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<cpu, gpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
}, ret.ctx(), {from.ptr_->var}, {ret.ptr_->var});
#else
LOG(FATAL) << "GPU is not enabled";
#endif
} else if (a == gpu::kDevMask && b == cpu::kDevMask) {
#if MXNET_USE_CUDA
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<gpu, cpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
}, from.ctx(), {from.ptr_->var}, {ret.ptr_->var});
#else
LOG(FATAL) << "GPU is not enabled";
#endif
} else if (a == gpu::kDevMask && b == gpu::kDevMask) {
}, from.ctx(), const_vars, {ret.ptr_->var});
} else {
#if MXNET_USE_CUDA
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<gpu, gpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
}, from.ctx(), {from.ptr_->var}, {ret.ptr_->var});
if (a == cpu::kDevMask && b == gpu::kDevMask) {
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<cpu, gpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, ret.ctx(), const_vars, {ret.ptr_->var});
} else if (a == gpu::kDevMask && b == cpu::kDevMask) {
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<gpu, cpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, from.ctx(), const_vars, {ret.ptr_->var});
} else if (a == gpu::kDevMask && b == gpu::kDevMask) {
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<gpu, gpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, from.ctx(), const_vars, {ret.ptr_->var});
} else {
LOG(FATAL) << "unknown device mask";
}
#else
LOG(FATAL) << "GPU is not enabled";
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
LOG(FATAL) << "unknown device mask";
}
}

Expand Down
19 changes: 17 additions & 2 deletions src/symbol/graph_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ inline std::vector<std::pair<T, T> > GraphExecutor::GetInplaceOption(
inline GraphExecutor::OpExecEntry
GraphExecutor::GetOpExecEntry(uint32_t nid) {
OpNode& op_node = op_nodes_[nid];
Operator *op = op_node.op.get();
std::vector<OpReqType> req;
std::vector<TBlob> in_data, out_data, aux_states;
in_data.reserve(graph_.nodes[nid].inputs.size());
Expand Down Expand Up @@ -199,14 +198,30 @@ GraphExecutor::GetOpExecEntry(uint32_t nid) {
}
}

// start setup exec function.
Operator* op = op_node.op.get();
OpContext* op_ctx_ptr = &op_node.op_ctx;
exec.exec_fun = [op, op_ctx_ptr, in_data, req, out_data, aux_states] (RunContext ctx) {
bool is_gpu = op_node.ctx.dev_mask == gpu::kDevMask;
exec.exec_fun = [op, is_gpu, op_ctx_ptr, in_data, req, out_data, aux_states] (RunContext ctx) {
op_ctx_ptr->run_ctx = ctx;
op->Forward(*op_ctx_ptr, in_data, req, out_data, aux_states);
if (is_gpu) {
#if MXNET_USE_CUDA
// Wait GPU kernel to finish.
ctx.get_stream<gpu>()->Wait();
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
}
};
return exec;
}

GraphExecutor::~GraphExecutor() {
// need to destruct after all previously issued operations are finished.
Engine::Get()->WaitForAll();
}

void GraphExecutor::InitGraph(Symbol symbol, Context ctx, bool need_backward) {
// initialize all internal data structures
symbol.ToStaticGraph(&graph_);
Expand Down
2 changes: 1 addition & 1 deletion src/symbol/graph_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace mxnet {
*/
class GraphExecutor : public Executor {
public:
virtual ~GraphExecutor() {}
virtual ~GraphExecutor();
virtual void Forward(bool is_train);
virtual void Backward(const std::vector<NArray> &head_grads);
virtual const std::vector<NArray> &heads() const {
Expand Down
Loading