Skip to content

Commit

Permalink
break loop in unreportedAdminThread_ if the sever being stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Oct 9, 2021
1 parent c1933dc commit 15465a8
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 19 deletions.
64 changes: 48 additions & 16 deletions src/storage/admin/AdminTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ bool AdminTaskManager::init() {
}

bgThread_->addTask(&AdminTaskManager::schedule, this);
shutdown_ = false;
shutdown_.store(false, std::memory_order_release);
ifAnyUnreported_ = false;
handleUnreportedTasks();
LOG(INFO) << "exit AdminTaskManager::init()";
return true;
Expand All @@ -41,20 +42,29 @@ void AdminTaskManager::handleUnreportedTasks() {
std::tuple<JobID, TaskID, std::string, folly::Future<StatusOr<nebula::cpp2::ErrorCode>>>;
if (env_ == nullptr) return;
unreportedAdminThread_.reset(new std::thread([this] {
bool ifAny = true;
while (true) {
std::unique_lock<std::mutex> lk(unreportedMutex_);
if (!ifAny) unreportedCV_.wait(lk);
ifAny = false;
if (!ifAnyUnreported_) {
unreportedCV_.wait(lk);
}
if (shutdown_.load(std::memory_order_acquire)) {
break;
}
ifAnyUnreported_ = false;
std::unique_ptr<kvstore::KVIterator> iter;
auto kvRet = env_->adminStore_->scan(&iter);
if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED || iter == nullptr) continue;
lk.unlock();
if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED || iter == nullptr) {
continue;
}
std::vector<std::string> keys;
std::vector<futTuple> futVec;
for (; iter->valid(); iter->next()) {
folly::StringPiece key = iter->key();
int32_t seqId = *reinterpret_cast<const int32_t*>(key.data());
if (seqId < 0) continue;
if (seqId < 0) {
continue;
}
JobID jobId = *reinterpret_cast<const JobID*>(key.data() + sizeof(int32_t));
TaskID taskId =
*reinterpret_cast<const TaskID*>(key.data() + sizeof(int32_t) + sizeof(JobID));
Expand All @@ -67,14 +77,17 @@ void AdminTaskManager::handleUnreportedTasks() {
nebula::cpp2::ErrorCode errCode =
*reinterpret_cast<const nebula::cpp2::ErrorCode*>(val.data());
meta::cpp2::StatsItem* pStats = nullptr;
if (errCode == nebula::cpp2::ErrorCode::SUCCEEDED) pStats = &statsItem;
if (errCode == nebula::cpp2::ErrorCode::SUCCEEDED) {
pStats = &statsItem;
}
LOG(INFO) << folly::sformat("reportTaskFinish(), job={}, task={}, rc={}",
jobId,
taskId,
apache::thrift::util::enumNameSafe(errCode));
if (seqId < env_->adminSeqId_) {
if (jobStatus == nebula::meta::cpp2::JobStatus::RUNNING && pStats != nullptr)
if (jobStatus == nebula::meta::cpp2::JobStatus::RUNNING && pStats != nullptr) {
pStats->set_status(nebula::meta::cpp2::JobStatus::FAILED);
}
auto fut = env_->metaClient_->reportTaskFinish(jobId, taskId, errCode, pStats);
futVec.emplace_back(std::move(jobId), std::move(taskId), std::move(key), std::move(fut));
} else if (jobStatus != nebula::meta::cpp2::JobStatus::RUNNING) {
Expand All @@ -92,15 +105,15 @@ void AdminTaskManager::handleUnreportedTasks() {
if (!fut.hasValue()) {
LOG(INFO) << folly::sformat(
"reportTaskFinish() got rpc error:, job={}, task={}", jobId, taskId);
ifAny = true;
ifAnyUnreported_ = true;
continue;
}
if (!fut.value().ok()) {
LOG(INFO) << folly::sformat("reportTaskFinish() has bad status:, job={}, task={}, rc={}",
jobId,
taskId,
fut.value().status().toString());
ifAny = true;
ifAnyUnreported_ = true;
continue;
}
rc = fut.value().value();
Expand All @@ -110,7 +123,7 @@ void AdminTaskManager::handleUnreportedTasks() {
apache::thrift::util::enumNameSafe(rc));
if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED ||
rc == nebula::cpp2::ErrorCode::E_STORE_FAILURE) {
ifAny = true;
ifAnyUnreported_ = true;
continue;
} else {
keys.emplace_back(key.data(), key.size());
Expand All @@ -119,6 +132,7 @@ void AdminTaskManager::handleUnreportedTasks() {
}
env_->adminStore_->multiRemove(keys);
}
LOG(INFO) << "Unreported-Admin-Thread stopped";
}));
}

Expand Down Expand Up @@ -168,7 +182,8 @@ nebula::cpp2::ErrorCode AdminTaskManager::cancelTask(JobID jobId, TaskID taskId)

void AdminTaskManager::shutdown() {
LOG(INFO) << "enter AdminTaskManager::shutdown()";
shutdown_ = true;
shutdown_.store(true, std::memory_order_release);
notifyReporting();
bgThread_->stop();
bgThread_->wait();

Expand All @@ -177,6 +192,9 @@ void AdminTaskManager::shutdown() {
}

pool_->join();
if (unreportedAdminThread_ != nullptr) {
unreportedAdminThread_->join();
}
LOG(INFO) << "exit AdminTaskManager::shutdown()";
}

Expand Down Expand Up @@ -207,14 +225,14 @@ void AdminTaskManager::removeTaskStatus(JobID jobId, TaskID taskId) {
// schedule
void AdminTaskManager::schedule() {
std::chrono::milliseconds interval{20}; // 20ms
while (!shutdown_) {
while (!shutdown_.load(std::memory_order_acquire)) {
LOG(INFO) << "waiting for incoming task";
folly::Optional<TaskHandle> optTaskHandle{folly::none};
while (!optTaskHandle && !shutdown_) {
while (!optTaskHandle && !shutdown_.load(std::memory_order_acquire)) {
optTaskHandle = taskQueue_.try_take_for(interval);
}

if (shutdown_) {
if (shutdown_.load(std::memory_order_acquire)) {
LOG(INFO) << "detect AdminTaskManager::shutdown()";
return;
}
Expand Down Expand Up @@ -308,7 +326,21 @@ void AdminTaskManager::runSubTask(TaskHandle handle) {
}
}

void AdminTaskManager::notifyReporting() { unreportedCV_.notify_one(); }
void AdminTaskManager::notifyReporting() {
std::unique_lock<std::mutex> lk(unreportedMutex_);
ifAnyUnreported_ = true;
unreportedCV_.notify_one();
}

void AdminTaskManager::saveAndNotify(JobID jobId,
TaskID taskId,
nebula::cpp2::ErrorCode rc,
const nebula::meta::cpp2::StatsItem& result) {
std::unique_lock<std::mutex> lk(unreportedMutex_);
saveTaskStatus(jobId, taskId, rc, result);
ifAnyUnreported_ = true;
unreportedCV_.notify_one();
}

bool AdminTaskManager::isFinished(JobID jobID, TaskID taskID) {
auto iter = tasks_.find(std::make_pair(jobID, taskID));
Expand Down
8 changes: 7 additions & 1 deletion src/storage/admin/AdminTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,17 @@ class AdminTaskManager {

void notifyReporting();

void saveAndNotify(JobID jobId,
TaskID taskId,
nebula::cpp2::ErrorCode rc,
const nebula::meta::cpp2::StatsItem& result);

private:
void schedule();
void runSubTask(TaskHandle handle);

private:
bool shutdown_{false};
std::atomic<bool> shutdown_{false};
std::unique_ptr<ThreadPool> pool_{nullptr};
TaskContainer tasks_;
TaskQueue taskQueue_;
Expand All @@ -77,6 +82,7 @@ class AdminTaskManager {
std::unique_ptr<std::thread> unreportedAdminThread_;
std::mutex unreportedMutex_;
std::condition_variable unreportedCV_;
bool ifAnyUnreported_{false};
};

} // namespace storage
Expand Down
3 changes: 1 addition & 2 deletions src/storage/admin/AdminTaskProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ void AdminTaskProcessor::process(const cpp2::AddAdminTaskRequest& req) {

auto cb = [taskManager, jobId = req.get_job_id(), taskId = req.get_task_id()](
nebula::cpp2::ErrorCode errCode, nebula::meta::cpp2::StatsItem& result) {
taskManager->saveTaskStatus(jobId, taskId, errCode, result);
taskManager->notifyReporting();
taskManager->saveAndNotify(jobId, taskId, errCode, result);
};

TaskContext ctx(req, std::move(cb));
Expand Down

0 comments on commit 15465a8

Please sign in to comment.