diff --git a/src/chunkserver/concurrent_apply/concurrent_apply.cpp b/src/chunkserver/concurrent_apply/concurrent_apply.cpp index dd7b219502..48d7130681 100644 --- a/src/chunkserver/concurrent_apply/concurrent_apply.cpp +++ b/src/chunkserver/concurrent_apply/concurrent_apply.cpp @@ -124,8 +124,11 @@ void ConcurrentApplyModule::Run(ApplyTaskType type, int index) { } void ConcurrentApplyModule::Stop() { + if (!start_.exchange(false)) { + return; + } + LOG(INFO) << "stop ConcurrentApplyModule..."; - start_ = false; auto wakeup = []() {}; for (auto iter : rapplyMap_) { iter.second->tq.Push(wakeup); @@ -145,15 +148,36 @@ void ConcurrentApplyModule::Stop() { } void ConcurrentApplyModule::Flush() { + if (!start_.load(std::memory_order_relaxed)) { + return; + } + CountDownEvent event(wconcurrentsize_); - auto flushtask = [&event]() { - event.Signal(); - }; + auto flushtask = [&event]() { event.Signal(); }; + + for (int i = 0; i < wconcurrentsize_; i++) { + wapplyMap_[i]->tq.Push(flushtask); + } + + event.Wait(); +} + +void ConcurrentApplyModule::FlushAll() { + if (!start_.load(std::memory_order_relaxed)) { + return; + } + + CountDownEvent event(wconcurrentsize_ + rconcurrentsize_); + auto flushtask = [&event]() { event.Signal(); }; for (int i = 0; i < wconcurrentsize_; i++) { wapplyMap_[i]->tq.Push(flushtask); } + for (int i = 0; i < rconcurrentsize_; i++) { + rapplyMap_[i]->tq.Push(flushtask); + } + event.Wait(); } diff --git a/src/chunkserver/concurrent_apply/concurrent_apply.h b/src/chunkserver/concurrent_apply/concurrent_apply.h index a4e9712348..60b10a3e55 100644 --- a/src/chunkserver/concurrent_apply/concurrent_apply.h +++ b/src/chunkserver/concurrent_apply/concurrent_apply.h @@ -107,6 +107,7 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule { * Flush: finish all task in write threads */ void Flush(); + void FlushAll(); void Stop(); @@ -128,7 +129,7 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule { explicit TaskThread(size_t capacity) : tq(capacity) {} }; - bool start_; + std::atomic start_; int rconcurrentsize_; int rqueuedepth_; int wconcurrentsize_;