Skip to content

Commit

Permalink
break loop in unreportedAdminThread_ if the sever being stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Oct 9, 2021
1 parent c1933dc commit ee0517a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
34 changes: 21 additions & 13 deletions src/storage/admin/AdminTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ bool AdminTaskManager::init() {
}

bgThread_->addTask(&AdminTaskManager::schedule, this);
shutdown_ = false;
shutdown_.store(false, std::memory_order_release);
ifAnyUnreported_.store(false, std::memory_order_release);
handleUnreportedTasks();
LOG(INFO) << "exit AdminTaskManager::init()";
return true;
Expand All @@ -41,11 +42,11 @@ void AdminTaskManager::handleUnreportedTasks() {
std::tuple<JobID, TaskID, std::string, folly::Future<StatusOr<nebula::cpp2::ErrorCode>>>;
if (env_ == nullptr) return;
unreportedAdminThread_.reset(new std::thread([this] {
bool ifAny = true;
std::unique_lock<std::mutex> lk(unreportedMutex_);
while (true) {
std::unique_lock<std::mutex> lk(unreportedMutex_);
if (!ifAny) unreportedCV_.wait(lk);
ifAny = false;
if (!ifAnyUnreported_.load(std::memory_order_acquire)) unreportedCV_.wait(lk);
if (shutdown_.load(std::memory_order_acquire)) break;
ifAnyUnreported_.store(false, std::memory_order_release);
std::unique_ptr<kvstore::KVIterator> iter;
auto kvRet = env_->adminStore_->scan(&iter);
if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED || iter == nullptr) continue;
Expand Down Expand Up @@ -92,15 +93,15 @@ void AdminTaskManager::handleUnreportedTasks() {
if (!fut.hasValue()) {
LOG(INFO) << folly::sformat(
"reportTaskFinish() got rpc error:, job={}, task={}", jobId, taskId);
ifAny = true;
ifAnyUnreported_.store(true, std::memory_order_release);
continue;
}
if (!fut.value().ok()) {
LOG(INFO) << folly::sformat("reportTaskFinish() has bad status:, job={}, task={}, rc={}",
jobId,
taskId,
fut.value().status().toString());
ifAny = true;
ifAnyUnreported_.store(true, std::memory_order_release);
continue;
}
rc = fut.value().value();
Expand All @@ -110,7 +111,7 @@ void AdminTaskManager::handleUnreportedTasks() {
apache::thrift::util::enumNameSafe(rc));
if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED ||
rc == nebula::cpp2::ErrorCode::E_STORE_FAILURE) {
ifAny = true;
ifAnyUnreported_.store(true, std::memory_order_release);
continue;
} else {
keys.emplace_back(key.data(), key.size());
Expand All @@ -119,6 +120,7 @@ void AdminTaskManager::handleUnreportedTasks() {
}
env_->adminStore_->multiRemove(keys);
}
LOG(INFO) << "Unreported-Admin-Thread stopped";
}));
}

Expand Down Expand Up @@ -168,7 +170,8 @@ nebula::cpp2::ErrorCode AdminTaskManager::cancelTask(JobID jobId, TaskID taskId)

void AdminTaskManager::shutdown() {
LOG(INFO) << "enter AdminTaskManager::shutdown()";
shutdown_ = true;
shutdown_.store(true, std::memory_order_release);
notifyReporting();
bgThread_->stop();
bgThread_->wait();

Expand All @@ -177,6 +180,7 @@ void AdminTaskManager::shutdown() {
}

pool_->join();
if (unreportedAdminThread_ != nullptr) unreportedAdminThread_->join();
LOG(INFO) << "exit AdminTaskManager::shutdown()";
}

Expand Down Expand Up @@ -207,14 +211,14 @@ void AdminTaskManager::removeTaskStatus(JobID jobId, TaskID taskId) {
// schedule
void AdminTaskManager::schedule() {
std::chrono::milliseconds interval{20}; // 20ms
while (!shutdown_) {
while (!shutdown_.load(std::memory_order_acquire)) {
LOG(INFO) << "waiting for incoming task";
folly::Optional<TaskHandle> optTaskHandle{folly::none};
while (!optTaskHandle && !shutdown_) {
while (!optTaskHandle && !shutdown_.load(std::memory_order_acquire)) {
optTaskHandle = taskQueue_.try_take_for(interval);
}

if (shutdown_) {
if (shutdown_.load(std::memory_order_acquire)) {
LOG(INFO) << "detect AdminTaskManager::shutdown()";
return;
}
Expand Down Expand Up @@ -308,7 +312,11 @@ void AdminTaskManager::runSubTask(TaskHandle handle) {
}
}

void AdminTaskManager::notifyReporting() { unreportedCV_.notify_one(); }
void AdminTaskManager::notifyReporting() {
std::unique_lock<std::mutex> lk(unreportedMutex_, std::try_to_lock);
if (!lk.owns_lock()) ifAnyUnreported_.store(true, std::memory_order_release);
unreportedCV_.notify_one();
}

bool AdminTaskManager::isFinished(JobID jobID, TaskID taskID) {
auto iter = tasks_.find(std::make_pair(jobID, taskID));
Expand Down
3 changes: 2 additions & 1 deletion src/storage/admin/AdminTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class AdminTaskManager {
void runSubTask(TaskHandle handle);

private:
bool shutdown_{false};
std::atomic<bool> shutdown_{false};
std::unique_ptr<ThreadPool> pool_{nullptr};
TaskContainer tasks_;
TaskQueue taskQueue_;
Expand All @@ -77,6 +77,7 @@ class AdminTaskManager {
std::unique_ptr<std::thread> unreportedAdminThread_;
std::mutex unreportedMutex_;
std::condition_variable unreportedCV_;
std::atomic<bool> ifAnyUnreported_ = false;
};

} // namespace storage
Expand Down

0 comments on commit ee0517a

Please sign in to comment.