From 3b192702d0e6b860f35d7b6fe448f9e583e358cf Mon Sep 17 00:00:00 2001 From: Pedro Larroy Date: Wed, 29 Nov 2017 17:50:47 +0100 Subject: [PATCH] Minor refactor: prevent string copying, list -> vector, shared_ptr by ref --- include/mxnet/base.h | 12 +++--- src/common/utils.h | 4 +- src/engine/stream_manager.h | 6 +-- src/engine/thread_pool.h | 14 ++++--- src/engine/threaded_engine.cc | 24 +++++------ src/engine/threaded_engine.h | 12 +++--- src/engine/threaded_engine_perdevice.cc | 53 ++++++++++++------------- 7 files changed, 62 insertions(+), 63 deletions(-) diff --git a/include/mxnet/base.h b/include/mxnet/base.h index 84b2fea7129c..f7b41e58988e 100644 --- a/include/mxnet/base.h +++ b/include/mxnet/base.h @@ -20,7 +20,7 @@ /*! * Copyright (c) 2015 by Contributors * \file base.h - * \brief configuation of mxnet as well as basic data structure. + * \brief configuration of MXNet as well as basic data structure. */ #ifndef MXNET_BASE_H_ #define MXNET_BASE_H_ @@ -243,7 +243,7 @@ struct Context { * \param str the string pattern * \return Context */ - inline static Context FromString(std::string str); + inline static Context FromString(const std::string& str); }; /*! @@ -316,15 +316,15 @@ inline Context Context::GPU(int32_t dev_id) { return Create(kGPU, dev_id); } -inline Context Context::FromString(std::string str) { +inline Context Context::FromString(const std::string& str) { Context ret; try { - std::string::size_type l = str.find('('); + const std::string::size_type l = str.find('('); CHECK_NE(l, std::string::npos); - std::string::size_type r = str.find(')'); + const std::string::size_type r = str.find(')'); CHECK_EQ(r, str.length()-1); - std::string type = str.substr(0, l); + const std::string type = str.substr(0, l); int id = std::stoi(str.substr(l+1, r-l-1)); if (type == "cpu") { ret = CPU(id); diff --git a/src/common/utils.h b/src/common/utils.h index 038ab2a04721..91764509ae17 100644 --- a/src/common/utils.h +++ b/src/common/utils.h @@ -340,7 +340,7 @@ inline std::string stype_string(const int x) { } // heuristic to dermine number of threads per GPU -inline int GetNumThreadPerGPU() { +inline int GetNumThreadsPerGPU() { // This is resource efficient option. return dmlc::GetEnv("MXNET_GPU_WORKER_NTHREADS", 2); } @@ -350,7 +350,7 @@ inline int GetNumThreadPerGPU() { inline int GetExecNumMatchColor() { // This is resource efficient option. int num_match_color = dmlc::GetEnv("MXNET_EXEC_NUM_TEMP", 1); - return std::min(num_match_color, GetNumThreadPerGPU()); + return std::min(num_match_color, GetNumThreadsPerGPU()); } template diff --git a/src/engine/stream_manager.h b/src/engine/stream_manager.h index 432bccf27df4..ddbfde81f055 100644 --- a/src/engine/stream_manager.h +++ b/src/engine/stream_manager.h @@ -51,7 +51,7 @@ class StreamManager { RunContext GetIORunContext(Context const& ctx); void Finalize(); private: - std::mutex m_; + std::mutex mutex_; #if MXNET_USE_CUDA std::array*, kStreams>, kNumGpus> gpu_streams_; @@ -74,7 +74,7 @@ RunContext StreamManager::GetRunContext( std::size_t use_counter; CUDA_CALL(cudaSetDevice(ctx.dev_id)); { - std::lock_guard lock{m_}; + std::lock_guard lock{mutex_}; auto&& counter = gpu_cnt_.at(ctx.dev_id); if (counter == -1) { for (auto&& i : gpu_streams_.at(ctx.dev_id)) { @@ -109,7 +109,7 @@ RunContext StreamManager::GetIORunContext( #if MXNET_USE_CUDA CUDA_CALL(cudaSetDevice(ctx.dev_id)); { - std::lock_guard lock{m_}; + std::lock_guard lock{mutex_}; if (gpu_io_streams_.at(ctx.dev_id) == nullptr) { gpu_io_streams_.at(ctx.dev_id) = mshadow::NewStream(false, false, ctx.dev_id); } diff --git a/src/engine/thread_pool.h b/src/engine/thread_pool.h index b4dae6bfd41d..09edaa6263fe 100644 --- a/src/engine/thread_pool.h +++ b/src/engine/thread_pool.h @@ -26,7 +26,6 @@ #include #include #include -#include #include #include #include "mxnet/base.h" @@ -58,8 +57,8 @@ class ThreadPool { /*! \brief Signal event upon destruction, even for exceptions (RAII) */ struct SetReadyOnDestroy { - explicit inline SetReadyOnDestroy(std::shared_ptr *event) - : event_(*event) { + explicit inline SetReadyOnDestroy(const std::shared_ptr& event) + : event_(event) { } inline ~SetReadyOnDestroy() { if (event_) { @@ -82,14 +81,17 @@ class ThreadPool { */ explicit ThreadPool(size_t size, std::function func) : worker_threads_(size) { + CHECK_GT(size, 0); for (auto& i : worker_threads_) { i = std::thread(func); } } explicit ThreadPool(size_t size, - std::function ready)> func, + std::function& ready)> func, const bool wait) : worker_threads_(size) { + CHECK_GT(size, 0); + ready_events_.reserve(size); for (auto& i : worker_threads_) { std::shared_ptr ptr = std::make_shared(); ready_events_.emplace_back(ptr); @@ -110,7 +112,7 @@ class ThreadPool { * \brief Wait for all started threads to signal that they're ready */ void WaitForReady() { - for (std::shared_ptr ptr : ready_events_) { + for (std::shared_ptr& ptr : ready_events_) { ptr->wait(); } } @@ -122,7 +124,7 @@ class ThreadPool { /*! * \brief Startup synchronization objects */ - std::list> ready_events_; + std::vector > ready_events_; /*! * \brief Disallow default construction. */ diff --git a/src/engine/threaded_engine.cc b/src/engine/threaded_engine.cc index b17d92863725..2b28a7d602a3 100644 --- a/src/engine/threaded_engine.cc +++ b/src/engine/threaded_engine.cc @@ -49,7 +49,7 @@ ThreadedVar::ThreadedVar(VersionedVarBlock* head) : head_{head} { } inline void ThreadedVar::AppendReadDependency(OprBlock* opr_block) { - std::lock_guard lock{m_}; + std::lock_guard lock{mutex_}; if (pending_write_ == nullptr) { // invariant: is_ready_to_read() CHECK_GE(num_pending_reads_, 0); @@ -71,7 +71,7 @@ inline void ThreadedVar::AppendReadDependency(OprBlock* opr_block) { inline void ThreadedVar::AppendWriteDependency(OprBlock* opr_block) { auto&& new_var_block = VersionedVarBlock::New(); - std::lock_guard lock{m_}; + std::lock_guard lock{mutex_}; // invariant. assert(head_->next == nullptr); assert(head_->trigger == nullptr); @@ -102,7 +102,7 @@ inline void ThreadedVar::CompleteReadDependency(Dispatcher dispatcher) { OprBlock *trigger = nullptr; { // this is lock scope - std::lock_guard lock{m_}; + std::lock_guard lock{mutex_}; CHECK_GT(num_pending_reads_, 0); if (--num_pending_reads_ == 0) { @@ -124,7 +124,7 @@ inline bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) { VersionedVarBlock *old_pending_write, *end_of_read_chain; OprBlock* trigger_write = nullptr; { - std::lock_guard lock{m_}; + std::lock_guard lock{mutex_}; // invariants assert(head_->next == nullptr); assert(pending_write_ != nullptr); @@ -187,12 +187,12 @@ inline bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) { } inline void ThreadedVar::SetToDelete() { - std::lock_guard lock{m_}; + std::lock_guard lock{mutex_}; to_delete_ = true; } inline bool ThreadedVar::ready_to_read() { - std::lock_guard lock{m_}; + std::lock_guard lock{mutex_}; return this->is_ready_to_read(); } @@ -228,8 +228,8 @@ void ThreadedEngine::CheckDuplicate(std::vector const& const_vars, // Check for duplicates. auto use = const_vars; auto mutate = mutable_vars; - auto use_size = use.size(); - auto mutate_size = mutate.size(); + const size_t use_size = use.size(); + const size_t mutate_size = mutate.size(); std::sort(use.begin(), use.end()); std::sort(mutate.begin(), mutate.end()); for (std::size_t i = 0; i < use_size; ++i) { @@ -381,7 +381,7 @@ void ThreadedEngine::WaitForVar(VarHandle var) { std::unique_lock lock{finished_m_}; finished_cv_.wait(lock, [this, &done]() { return done.load() || kill_.load(); - }); + }); } } @@ -403,11 +403,11 @@ inline void ThreadedEngine::OnComplete(ThreadedOpr* threaded_opr) { } // Mark complete for write variables. for (auto&& i : threaded_opr->mutable_vars) { - bool debug_info = (engine_info_ && debug_wait_var_ == i); + const bool debug_info = (engine_info_ && debug_wait_var_ == i); if (debug_info) { LOG(INFO) << "Complete write dep for " << i; } - bool to_delete = i->CompleteWriteDependency( + const bool to_delete = i->CompleteWriteDependency( [this, debug_info](OprBlock* opr) { if (debug_info) { LOG(INFO) << "PushToExecute " << opr; @@ -426,7 +426,7 @@ inline void ThreadedEngine::OnComplete(ThreadedOpr* threaded_opr) { // could execute right after we mark all vars as complete, so if // threaded_opr is not temporary, its value is not reliable // anymore start from here. - int npending; + int npending = 0; { std::unique_lock lock{finished_m_}; npending = --pending_; diff --git a/src/engine/threaded_engine.h b/src/engine/threaded_engine.h index d85321c52c9f..1524f257560f 100644 --- a/src/engine/threaded_engine.h +++ b/src/engine/threaded_engine.h @@ -85,8 +85,8 @@ struct OprBlock : public common::ObjectPoolAllocatable { * \return the wait counter after the decreasement. */ inline int decr_wait() { - // chack invariant, avoid over trigger - int ret = --wait; + // check invariant, avoid over trigger + const int ret = --wait; CHECK_GE(ret, 0); return ret; } @@ -112,8 +112,8 @@ struct VersionedVarBlock * \brief Variable implementation. * Each ThreadedVar is a linked list(queue) of operations to be performed. */ -class ThreadedVar final : public Var, - public common::ObjectPoolAllocatable { +class ThreadedVar final + : public Var, public common::ObjectPoolAllocatable { public: /*! * \brief constructor @@ -180,7 +180,7 @@ class ThreadedVar final : public Var, // TODO(hotpxl) change this to spinlock for faster runtime // TODO(hotpxl) consider rename head /*! \brief inetrnal mutex of the ThreadedVar */ - std::mutex m_; + std::mutex mutex_; /*! * \brief number of pending reads operation in the variable. * will be marked as -1 when there is a already triggered pending write. @@ -446,7 +446,7 @@ class ThreadedEngine : public Engine { if (!bulk_status.count) return; bulk_status.count = 0; DeduplicateVarHandle(&bulk_status.const_vars, &bulk_status.mutable_vars); - auto fn = std::move(bulk_status.fn); + SyncFn fn = std::move(bulk_status.fn); this->PushAsync([fn](RunContext ctx, CallbackOnComplete on_complete) { fn(ctx); on_complete(); diff --git a/src/engine/threaded_engine_perdevice.cc b/src/engine/threaded_engine_perdevice.cc index e7e222f6cbe3..2b06f619374a 100644 --- a/src/engine/threaded_engine_perdevice.cc +++ b/src/engine/threaded_engine_perdevice.cc @@ -89,7 +89,7 @@ class ThreadedEnginePerDevice : public ThreadedEngine { void Start() override { if (is_worker_) return; - gpu_worker_nthreads_ = common::GetNumThreadPerGPU(); + gpu_worker_nthreads_ = common::GetNumThreadsPerGPU(); cpu_worker_nthreads_ = dmlc::GetEnv("MXNET_CPU_WORKER_NTHREADS", 1); // create CPU task int cpu_priority_nthreads = dmlc::GetEnv("MXNET_CPU_PRIORITY_NTHREADS", 4); @@ -118,17 +118,15 @@ class ThreadedEnginePerDevice : public ThreadedEngine { if (opr_block->opr->prop == FnProperty::kCPUPrioritized) { cpu_priority_worker_->task_queue.Push(opr_block, opr_block->priority); } else { - int dev_id = ctx.dev_id; - int nthread = cpu_worker_nthreads_; - auto ptr = - cpu_normal_workers_.Get(dev_id, [this, ctx, nthread]() { - auto blk = new ThreadWorkerBlock(); - blk->pool.reset(new ThreadPool(nthread, + const size_t nthreads = cpu_worker_nthreads_; + auto ptr = cpu_normal_workers_.Get(ctx.dev_id, [this, ctx, nthreads]() { + auto blk = new ThreadWorkerBlock(); + blk->pool.reset(new ThreadPool(nthreads, [this, ctx, blk](std::shared_ptr ready_event) { this->CPUWorker(ctx, blk, ready_event); }, true)); - return blk; - }); + return blk; + }); if (ptr) { if (opr_block->opr->prop == FnProperty::kDeleteVar) { ptr->task_queue.PushFront(opr_block, opr_block->priority); @@ -140,24 +138,23 @@ class ThreadedEnginePerDevice : public ThreadedEngine { } else { CHECK_EQ(ctx.dev_mask(), Context::kGPU); // GPU execution. - FnProperty prop = opr_block->opr->prop; - bool is_copy = (prop == FnProperty::kCopyFromGPU || - prop == FnProperty::kCopyToGPU); - int nthread = gpu_worker_nthreads_; + const FnProperty prop = opr_block->opr->prop; + const bool is_copy = (prop == FnProperty::kCopyFromGPU || + prop == FnProperty::kCopyToGPU); + const size_t nthread = gpu_worker_nthreads_; if (is_copy) { - auto ptr = - gpu_copy_workers_.Get(ctx.dev_id, [this, ctx, is_copy, nthread]() { + auto ptr = gpu_copy_workers_.Get(ctx.dev_id, [this, ctx, is_copy, nthread]() { // Signify to kernel that GPU is being used, so reserve cores as necessary OpenMP::Get()->set_reserve_cores(GetReserveCoreCount(true)); auto blk = new ThreadWorkerBlock(); - blk->pool.reset(new ThreadPool( - nthread, - [this, ctx, is_copy, blk] - (std::shared_ptr ready_event) { - this->GPUWorker(ctx, is_copy, blk, ready_event); - }, true)); - return blk; - }); + blk->pool.reset(new ThreadPool( + nthread, + [this, ctx, is_copy, blk] + (std::shared_ptr ready_event) { + this->GPUWorker(ctx, is_copy, blk, ready_event); + }, true)); + return blk; + }); if (ptr) { if (opr_block->opr->prop == FnProperty::kDeleteVar) { ptr->task_queue.PushFront(opr_block, opr_block->priority); @@ -177,7 +174,7 @@ class ThreadedEnginePerDevice : public ThreadedEngine { this->GPUWorker(ctx, is_copy, blk, ready_event); }, true)); return blk; - }); + }); if (ptr) { if (opr_block->opr->prop == FnProperty::kDeleteVar) { ptr->task_queue.PushFront(opr_block, opr_block->priority); @@ -207,9 +204,9 @@ class ThreadedEnginePerDevice : public ThreadedEngine { /*! \brief whether this is a worker thread. */ static MX_THREAD_LOCAL bool is_worker_; /*! \brief number of concurrent thread cpu worker uses */ - int cpu_worker_nthreads_; + size_t cpu_worker_nthreads_; /*! \brief number of concurrent thread each gpu worker uses */ - int gpu_worker_nthreads_; + size_t gpu_worker_nthreads_; // cpu worker common::LazyAllocArray > cpu_normal_workers_; // cpu priority worker @@ -228,12 +225,12 @@ class ThreadedEnginePerDevice : public ThreadedEngine { inline void GPUWorker(Context ctx, bool is_copy_worker, ThreadWorkerBlock *block, - std::shared_ptr ready_event) { + const std::shared_ptr& ready_event) { this->is_worker_ = true; #if MXNET_USE_CUDA mshadow::Stream *stream; do { - ThreadPool::SimpleEvent::SetReadyOnDestroy setReady(&ready_event); + ThreadPool::SimpleEvent::SetReadyOnDestroy setReady(ready_event); // allocate stream mshadow::SetDevice(ctx.dev_id); if (is_copy_worker) {