Skip to content

Commit

Permalink
[refactor] CurveBS: add some common functions
Browse files Browse the repository at this point in the history
Signed-off-by: Xinlong Chen <[email protected]>
  • Loading branch information
Xinlong-Chen authored and Cyber-SiKu committed Oct 23, 2023
1 parent 45eca24 commit e738ee1
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
32 changes: 28 additions & 4 deletions src/chunkserver/concurrent_apply/concurrent_apply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ void ConcurrentApplyModule::Run(ApplyTaskType type, int index) {
}

void ConcurrentApplyModule::Stop() {
if (!start_.exchange(false)) {
return;
}

LOG(INFO) << "stop ConcurrentApplyModule...";
start_ = false;
auto wakeup = []() {};
for (auto iter : rapplyMap_) {
iter.second->tq.Push(wakeup);
Expand All @@ -145,15 +148,36 @@ void ConcurrentApplyModule::Stop() {
}

void ConcurrentApplyModule::Flush() {
if (!start_.load(std::memory_order_relaxed)) {
return;
}

CountDownEvent event(wconcurrentsize_);
auto flushtask = [&event]() {
event.Signal();
};
auto flushtask = [&event]() { event.Signal(); };

for (int i = 0; i < wconcurrentsize_; i++) {
wapplyMap_[i]->tq.Push(flushtask);
}

event.Wait();
}

void ConcurrentApplyModule::FlushAll() {
if (!start_.load(std::memory_order_relaxed)) {
return;
}

CountDownEvent event(wconcurrentsize_ + rconcurrentsize_);
auto flushtask = [&event]() { event.Signal(); };

for (int i = 0; i < wconcurrentsize_; i++) {
wapplyMap_[i]->tq.Push(flushtask);
}

for (int i = 0; i < rconcurrentsize_; i++) {
rapplyMap_[i]->tq.Push(flushtask);
}

event.Wait();
}

Expand Down
3 changes: 2 additions & 1 deletion src/chunkserver/concurrent_apply/concurrent_apply.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule {
* Flush: finish all task in write threads
*/
void Flush();
void FlushAll();

void Stop();

Expand All @@ -128,7 +129,7 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule {
explicit TaskThread(size_t capacity) : tq(capacity) {}
};

bool start_;
std::atomic<bool> start_;
int rconcurrentsize_;
int rqueuedepth_;
int wconcurrentsize_;
Expand Down

0 comments on commit e738ee1

Please sign in to comment.