Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
Minor refactor: prevent string copying, list -> vector, shared_ptr by…
Browse files Browse the repository at this point in the history
… ref
  • Loading branch information
Pedro Larroy committed Dec 7, 2017
1 parent 7484437 commit fb0e257
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 61 deletions.
12 changes: 6 additions & 6 deletions include/mxnet/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
/*!
* Copyright (c) 2015 by Contributors
* \file base.h
* \brief configuation of mxnet as well as basic data structure.
* \brief configuration of MXNet as well as basic data structure.
*/
#ifndef MXNET_BASE_H_
#define MXNET_BASE_H_
Expand Down Expand Up @@ -243,7 +243,7 @@ struct Context {
* \param str the string pattern
* \return Context
*/
inline static Context FromString(std::string str);
inline static Context FromString(const std::string& str);
};

/*!
Expand Down Expand Up @@ -316,15 +316,15 @@ inline Context Context::GPU(int32_t dev_id) {
return Create(kGPU, dev_id);
}

inline Context Context::FromString(std::string str) {
inline Context Context::FromString(const std::string& str) {
Context ret;
try {
std::string::size_type l = str.find('(');
const std::string::size_type l = str.find('(');
CHECK_NE(l, std::string::npos);
std::string::size_type r = str.find(')');
const std::string::size_type r = str.find(')');
CHECK_EQ(r, str.length()-1);

std::string type = str.substr(0, l);
const std::string type = str.substr(0, l);
int id = std::stoi(str.substr(l+1, r-l-1));
if (type == "cpu") {
ret = CPU(id);
Expand Down
1 change: 0 additions & 1 deletion src/c_api/c_api_ndarray.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ int MXInvokeCachedOp(CachedOpHandle handle,
NDArrayHandle *inputs,
int *num_outputs,
NDArrayHandle **outputs) {
static const auto cached_op = nnvm::Op::Get("_CachedOp");
MXAPIThreadLocalEntry *ret = MXAPIThreadLocalStore::Get();

API_BEGIN();
Expand Down
4 changes: 2 additions & 2 deletions src/engine/stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ RunContext StreamManager<kNumGpus, kStreams>::GetRunContext(
std::size_t use_counter;
CUDA_CALL(cudaSetDevice(ctx.dev_id));
{
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
auto&& counter = gpu_cnt_.at(ctx.dev_id);
if (counter == -1) {
for (auto&& i : gpu_streams_.at(ctx.dev_id)) {
Expand Down Expand Up @@ -109,7 +109,7 @@ RunContext StreamManager<kNumGpus, kStreams>::GetIORunContext(
#if MXNET_USE_CUDA
CUDA_CALL(cudaSetDevice(ctx.dev_id));
{
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
if (gpu_io_streams_.at(ctx.dev_id) == nullptr) {
gpu_io_streams_.at(ctx.dev_id) = mshadow::NewStream<gpu>(false, false, ctx.dev_id);
}
Expand Down
12 changes: 6 additions & 6 deletions src/engine/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <dmlc/base.h>
#include <cstddef>
#include <vector>
#include <list>
#include <thread>
#include <utility>
#include "mxnet/base.h"
Expand Down Expand Up @@ -58,8 +57,8 @@ class ThreadPool {

/*! \brief Signal event upon destruction, even for exceptions (RAII) */
struct SetReadyOnDestroy {
explicit inline SetReadyOnDestroy(std::shared_ptr<SimpleEvent> *event)
: event_(*event) {
explicit inline SetReadyOnDestroy(const std::shared_ptr<SimpleEvent>& event)
: event_(event) {
}
inline ~SetReadyOnDestroy() {
if (event_) {
Expand Down Expand Up @@ -87,9 +86,10 @@ class ThreadPool {
}
}
explicit ThreadPool(size_t size,
std::function<void(std::shared_ptr<SimpleEvent> ready)> func,
std::function<void(const std::shared_ptr<SimpleEvent>& ready)> func,
const bool wait)
: worker_threads_(size) {
ready_events_.reserve(size);
for (auto& i : worker_threads_) {
std::shared_ptr<SimpleEvent> ptr = std::make_shared<SimpleEvent>();
ready_events_.emplace_back(ptr);
Expand All @@ -110,7 +110,7 @@ class ThreadPool {
* \brief Wait for all started threads to signal that they're ready
*/
void WaitForReady() {
for (std::shared_ptr<SimpleEvent> ptr : ready_events_) {
for (std::shared_ptr<SimpleEvent>& ptr : ready_events_) {
ptr->wait();
}
}
Expand All @@ -122,7 +122,7 @@ class ThreadPool {
/*!
* \brief Startup synchronization objects
*/
std::list<std::shared_ptr<SimpleEvent>> ready_events_;
std::vector<std::shared_ptr<SimpleEvent> > ready_events_;
/*!
* \brief Disallow default construction.
*/
Expand Down
22 changes: 11 additions & 11 deletions src/engine/threaded_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ ThreadedVar::ThreadedVar(VersionedVarBlock* head) : head_{head} {
}

inline void ThreadedVar::AppendReadDependency(OprBlock* opr_block) {
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
if (pending_write_ == nullptr) {
// invariant: is_ready_to_read()
CHECK_GE(num_pending_reads_, 0);
Expand All @@ -71,7 +71,7 @@ inline void ThreadedVar::AppendReadDependency(OprBlock* opr_block) {

inline void ThreadedVar::AppendWriteDependency(OprBlock* opr_block) {
auto&& new_var_block = VersionedVarBlock::New();
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
// invariant.
assert(head_->next == nullptr);
assert(head_->trigger == nullptr);
Expand Down Expand Up @@ -102,7 +102,7 @@ inline void ThreadedVar::CompleteReadDependency(Dispatcher dispatcher) {
OprBlock *trigger = nullptr;
{
// this is lock scope
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
CHECK_GT(num_pending_reads_, 0);

if (--num_pending_reads_ == 0) {
Expand All @@ -124,7 +124,7 @@ inline bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) {
VersionedVarBlock *old_pending_write, *end_of_read_chain;
OprBlock* trigger_write = nullptr;
{
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
// invariants
assert(head_->next == nullptr);
assert(pending_write_ != nullptr);
Expand Down Expand Up @@ -187,12 +187,12 @@ inline bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) {
}

inline void ThreadedVar::SetToDelete() {
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
to_delete_ = true;
}

inline bool ThreadedVar::ready_to_read() {
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
return this->is_ready_to_read();
}

Expand Down Expand Up @@ -228,8 +228,8 @@ void ThreadedEngine::CheckDuplicate(std::vector<VarHandle> const& const_vars,
// Check for duplicates.
auto use = const_vars;
auto mutate = mutable_vars;
auto use_size = use.size();
auto mutate_size = mutate.size();
const size_t use_size = use.size();
const size_t mutate_size = mutate.size();
std::sort(use.begin(), use.end());
std::sort(mutate.begin(), mutate.end());
for (std::size_t i = 0; i < use_size; ++i) {
Expand Down Expand Up @@ -403,11 +403,11 @@ inline void ThreadedEngine::OnComplete(ThreadedOpr* threaded_opr) {
}
// Mark complete for write variables.
for (auto&& i : threaded_opr->mutable_vars) {
bool debug_info = (engine_info_ && debug_wait_var_ == i);
const bool debug_info = (engine_info_ && debug_wait_var_ == i);
if (debug_info) {
LOG(INFO) << "Complete write dep for " << i;
}
bool to_delete = i->CompleteWriteDependency(
const bool to_delete = i->CompleteWriteDependency(
[this, debug_info](OprBlock* opr) {
if (debug_info) {
LOG(INFO) << "PushToExecute " << opr;
Expand All @@ -426,7 +426,7 @@ inline void ThreadedEngine::OnComplete(ThreadedOpr* threaded_opr) {
// could execute right after we mark all vars as complete, so if
// threaded_opr is not temporary, its value is not reliable
// anymore start from here.
int npending;
int npending = 0;
{
std::unique_lock<std::mutex> lock{finished_m_};
npending = --pending_;
Expand Down
12 changes: 6 additions & 6 deletions src/engine/threaded_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ struct OprBlock : public common::ObjectPoolAllocatable<OprBlock> {
* \return the wait counter after the decreasement.
*/
inline int decr_wait() {
// chack invariant, avoid over trigger
int ret = --wait;
// check invariant, avoid over trigger
const int ret = --wait;
CHECK_GE(ret, 0);
return ret;
}
Expand All @@ -112,8 +112,8 @@ struct VersionedVarBlock
* \brief Variable implementation.
* Each ThreadedVar is a linked list(queue) of operations to be performed.
*/
class ThreadedVar final : public Var,
public common::ObjectPoolAllocatable<ThreadedVar> {
class ThreadedVar final
: public Var, public common::ObjectPoolAllocatable<ThreadedVar> {
public:
/*!
* \brief constructor
Expand Down Expand Up @@ -180,7 +180,7 @@ class ThreadedVar final : public Var,
// TODO(hotpxl) change this to spinlock for faster runtime
// TODO(hotpxl) consider rename head
/*! \brief inetrnal mutex of the ThreadedVar */
std::mutex m_;
std::mutex mutex_;
/*!
* \brief number of pending reads operation in the variable.
* will be marked as -1 when there is a already triggered pending write.
Expand Down Expand Up @@ -446,7 +446,7 @@ class ThreadedEngine : public Engine {
if (!bulk_status.count) return;
bulk_status.count = 0;
DeduplicateVarHandle(&bulk_status.const_vars, &bulk_status.mutable_vars);
auto fn = std::move(bulk_status.fn);
SyncFn fn = std::move(bulk_status.fn);
this->PushAsync([fn](RunContext ctx, CallbackOnComplete on_complete) {
fn(ctx);
on_complete();
Expand Down
55 changes: 26 additions & 29 deletions src/engine/threaded_engine_perdevice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,14 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
if (opr_block->opr->prop == FnProperty::kCPUPrioritized) {
cpu_priority_worker_->task_queue.Push(opr_block, opr_block->priority);
} else {
int dev_id = ctx.dev_id;
int nthread = cpu_worker_nthreads_;
auto ptr =
cpu_normal_workers_.Get(dev_id, [this, ctx, nthread]() {
auto blk = new ThreadWorkerBlock<kWorkerQueue>();
blk->pool.reset(new ThreadPool(nthread, [this, ctx, blk] () {
this->CPUWorker(ctx, blk);
}));
return blk;
});
const size_t nthreads = cpu_worker_nthreads_;
auto ptr = cpu_normal_workers_.Get(ctx.dev_id, [this, ctx, nthreads]() {
auto blk = new ThreadWorkerBlock<kWorkerQueue>();
blk->pool.reset(new ThreadPool(nthreads, [this, ctx, blk]() {
this->CPUWorker(ctx, blk);
}));
return blk;
});
if (ptr) {
if (opr_block->opr->prop == FnProperty::kDeleteVar) {
ptr->task_queue.PushFront(opr_block, opr_block->priority);
Expand All @@ -138,24 +136,23 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
} else {
CHECK_EQ(ctx.dev_mask(), Context::kGPU);
// GPU execution.
FnProperty prop = opr_block->opr->prop;
bool is_copy = (prop == FnProperty::kCopyFromGPU ||
prop == FnProperty::kCopyToGPU);
int nthread = gpu_worker_nthreads_;
const FnProperty prop = opr_block->opr->prop;
const bool is_copy = (prop == FnProperty::kCopyFromGPU ||
prop == FnProperty::kCopyToGPU);
const size_t nthread = gpu_worker_nthreads_;
if (is_copy) {
auto ptr =
gpu_copy_workers_.Get(ctx.dev_id, [this, ctx, is_copy, nthread]() {
auto ptr = gpu_copy_workers_.Get(ctx.dev_id, [this, ctx, is_copy, nthread]() {
// Signify to kernel that GPU is being used, so reserve cores as necessary
OpenMP::Get()->set_reserve_cores(GetReserveCoreCount(true));
auto blk = new ThreadWorkerBlock<kCopyQueue>();
blk->pool.reset(new ThreadPool(
nthread,
[this, ctx, is_copy, blk]
(std::shared_ptr<ThreadPool::SimpleEvent> ready_event) {
this->GPUWorker(ctx, is_copy, blk, ready_event);
}, true));
return blk;
});
blk->pool.reset(new ThreadPool(
nthread,
[this, ctx, is_copy, blk]
(std::shared_ptr<ThreadPool::SimpleEvent> ready_event) {
this->GPUWorker(ctx, is_copy, blk, ready_event);
}, true));
return blk;
});
if (ptr) {
if (opr_block->opr->prop == FnProperty::kDeleteVar) {
ptr->task_queue.PushFront(opr_block, opr_block->priority);
Expand All @@ -175,7 +172,7 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
this->GPUWorker(ctx, is_copy, blk, ready_event);
}, true));
return blk;
});
});
if (ptr) {
if (opr_block->opr->prop == FnProperty::kDeleteVar) {
ptr->task_queue.PushFront(opr_block, opr_block->priority);
Expand Down Expand Up @@ -205,9 +202,9 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
/*! \brief whether this is a worker thread. */
static MX_THREAD_LOCAL bool is_worker_;
/*! \brief number of concurrent thread cpu worker uses */
int cpu_worker_nthreads_;
size_t cpu_worker_nthreads_;
/*! \brief number of concurrent thread each gpu worker uses */
int gpu_worker_nthreads_;
size_t gpu_worker_nthreads_;
// cpu worker
common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > cpu_normal_workers_;
// cpu priority worker
Expand All @@ -226,12 +223,12 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
inline void GPUWorker(Context ctx,
bool is_copy_worker,
ThreadWorkerBlock<type> *block,
std::shared_ptr<ThreadPool::SimpleEvent> ready_event) {
const std::shared_ptr<ThreadPool::SimpleEvent>& ready_event) {
this->is_worker_ = true;
#if MXNET_USE_CUDA
mshadow::Stream<gpu> *stream;
do {
ThreadPool::SimpleEvent::SetReadyOnDestroy setReady(&ready_event);
ThreadPool::SimpleEvent::SetReadyOnDestroy setReady(ready_event);
// allocate stream
mshadow::SetDevice<gpu>(ctx.dev_id);
if (is_copy_worker) {
Expand Down

0 comments on commit fb0e257

Please sign in to comment.