diff --git a/src/storage/admin/AdminTaskManager.cpp b/src/storage/admin/AdminTaskManager.cpp index 65a831937b6..41a142b2e8a 100644 --- a/src/storage/admin/AdminTaskManager.cpp +++ b/src/storage/admin/AdminTaskManager.cpp @@ -29,8 +29,9 @@ bool AdminTaskManager::init() { return false; } + shutdown_.store(false, std::memory_order_release); bgThread_->addTask(&AdminTaskManager::schedule, this); - shutdown_ = false; + ifAnyUnreported_ = false; handleUnreportedTasks(); LOG(INFO) << "exit AdminTaskManager::init()"; return true; @@ -41,20 +42,29 @@ void AdminTaskManager::handleUnreportedTasks() { std::tuple>>; if (env_ == nullptr) return; unreportedAdminThread_.reset(new std::thread([this] { - bool ifAny = true; while (true) { std::unique_lock lk(unreportedMutex_); - if (!ifAny) unreportedCV_.wait(lk); - ifAny = false; + if (!ifAnyUnreported_) { + unreportedCV_.wait(lk); + } + if (shutdown_.load(std::memory_order_acquire)) { + break; + } + ifAnyUnreported_ = false; std::unique_ptr iter; auto kvRet = env_->adminStore_->scan(&iter); - if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED || iter == nullptr) continue; + lk.unlock(); + if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED || iter == nullptr) { + continue; + } std::vector keys; std::vector futVec; for (; iter->valid(); iter->next()) { folly::StringPiece key = iter->key(); int32_t seqId = *reinterpret_cast(key.data()); - if (seqId < 0) continue; + if (seqId < 0) { + continue; + } JobID jobId = *reinterpret_cast(key.data() + sizeof(int32_t)); TaskID taskId = *reinterpret_cast(key.data() + sizeof(int32_t) + sizeof(JobID)); @@ -67,14 +77,17 @@ void AdminTaskManager::handleUnreportedTasks() { nebula::cpp2::ErrorCode errCode = *reinterpret_cast(val.data()); meta::cpp2::StatsItem* pStats = nullptr; - if (errCode == nebula::cpp2::ErrorCode::SUCCEEDED) pStats = &statsItem; + if (errCode == nebula::cpp2::ErrorCode::SUCCEEDED) { + pStats = &statsItem; + } LOG(INFO) << folly::sformat("reportTaskFinish(), job={}, task={}, rc={}", jobId, taskId, apache::thrift::util::enumNameSafe(errCode)); if (seqId < env_->adminSeqId_) { - if (jobStatus == nebula::meta::cpp2::JobStatus::RUNNING && pStats != nullptr) + if (jobStatus == nebula::meta::cpp2::JobStatus::RUNNING && pStats != nullptr) { pStats->set_status(nebula::meta::cpp2::JobStatus::FAILED); + } auto fut = env_->metaClient_->reportTaskFinish(jobId, taskId, errCode, pStats); futVec.emplace_back(std::move(jobId), std::move(taskId), std::move(key), std::move(fut)); } else if (jobStatus != nebula::meta::cpp2::JobStatus::RUNNING) { @@ -92,7 +105,7 @@ void AdminTaskManager::handleUnreportedTasks() { if (!fut.hasValue()) { LOG(INFO) << folly::sformat( "reportTaskFinish() got rpc error:, job={}, task={}", jobId, taskId); - ifAny = true; + ifAnyUnreported_ = true; continue; } if (!fut.value().ok()) { @@ -100,7 +113,7 @@ void AdminTaskManager::handleUnreportedTasks() { jobId, taskId, fut.value().status().toString()); - ifAny = true; + ifAnyUnreported_ = true; continue; } rc = fut.value().value(); @@ -110,7 +123,7 @@ void AdminTaskManager::handleUnreportedTasks() { apache::thrift::util::enumNameSafe(rc)); if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED || rc == nebula::cpp2::ErrorCode::E_STORE_FAILURE) { - ifAny = true; + ifAnyUnreported_ = true; continue; } else { keys.emplace_back(key.data(), key.size()); @@ -119,6 +132,7 @@ void AdminTaskManager::handleUnreportedTasks() { } env_->adminStore_->multiRemove(keys); } + LOG(INFO) << "Unreported-Admin-Thread stopped"; })); } @@ -168,7 +182,8 @@ nebula::cpp2::ErrorCode AdminTaskManager::cancelTask(JobID jobId, TaskID taskId) void AdminTaskManager::shutdown() { LOG(INFO) << "enter AdminTaskManager::shutdown()"; - shutdown_ = true; + shutdown_.store(true, std::memory_order_release); + notifyReporting(); bgThread_->stop(); bgThread_->wait(); @@ -177,6 +192,9 @@ void AdminTaskManager::shutdown() { } pool_->join(); + if (unreportedAdminThread_ != nullptr) { + unreportedAdminThread_->join(); + } LOG(INFO) << "exit AdminTaskManager::shutdown()"; } @@ -207,14 +225,14 @@ void AdminTaskManager::removeTaskStatus(JobID jobId, TaskID taskId) { // schedule void AdminTaskManager::schedule() { std::chrono::milliseconds interval{20}; // 20ms - while (!shutdown_) { + while (!shutdown_.load(std::memory_order_acquire)) { LOG(INFO) << "waiting for incoming task"; folly::Optional optTaskHandle{folly::none}; - while (!optTaskHandle && !shutdown_) { + while (!optTaskHandle && !shutdown_.load(std::memory_order_acquire)) { optTaskHandle = taskQueue_.try_take_for(interval); } - if (shutdown_) { + if (shutdown_.load(std::memory_order_acquire)) { LOG(INFO) << "detect AdminTaskManager::shutdown()"; return; } @@ -308,7 +326,21 @@ void AdminTaskManager::runSubTask(TaskHandle handle) { } } -void AdminTaskManager::notifyReporting() { unreportedCV_.notify_one(); } +void AdminTaskManager::notifyReporting() { + std::unique_lock lk(unreportedMutex_); + ifAnyUnreported_ = true; + unreportedCV_.notify_one(); +} + +void AdminTaskManager::saveAndNotify(JobID jobId, + TaskID taskId, + nebula::cpp2::ErrorCode rc, + const nebula::meta::cpp2::StatsItem& result) { + std::unique_lock lk(unreportedMutex_); + saveTaskStatus(jobId, taskId, rc, result); + ifAnyUnreported_ = true; + unreportedCV_.notify_one(); +} bool AdminTaskManager::isFinished(JobID jobID, TaskID taskID) { auto iter = tasks_.find(std::make_pair(jobID, taskID)); diff --git a/src/storage/admin/AdminTaskManager.h b/src/storage/admin/AdminTaskManager.h index 70f4974c4fd..88d11d286ad 100644 --- a/src/storage/admin/AdminTaskManager.h +++ b/src/storage/admin/AdminTaskManager.h @@ -63,12 +63,17 @@ class AdminTaskManager { void notifyReporting(); + void saveAndNotify(JobID jobId, + TaskID taskId, + nebula::cpp2::ErrorCode rc, + const nebula::meta::cpp2::StatsItem& result); + private: void schedule(); void runSubTask(TaskHandle handle); private: - bool shutdown_{false}; + std::atomic shutdown_{false}; std::unique_ptr pool_{nullptr}; TaskContainer tasks_; TaskQueue taskQueue_; @@ -77,6 +82,7 @@ class AdminTaskManager { std::unique_ptr unreportedAdminThread_; std::mutex unreportedMutex_; std::condition_variable unreportedCV_; + bool ifAnyUnreported_{false}; }; } // namespace storage diff --git a/src/storage/admin/AdminTaskProcessor.cpp b/src/storage/admin/AdminTaskProcessor.cpp index 06b1cc63965..c26192266e8 100644 --- a/src/storage/admin/AdminTaskProcessor.cpp +++ b/src/storage/admin/AdminTaskProcessor.cpp @@ -18,8 +18,7 @@ void AdminTaskProcessor::process(const cpp2::AddAdminTaskRequest& req) { auto cb = [taskManager, jobId = req.get_job_id(), taskId = req.get_task_id()]( nebula::cpp2::ErrorCode errCode, nebula::meta::cpp2::StatsItem& result) { - taskManager->saveTaskStatus(jobId, taskId, errCode, result); - taskManager->notifyReporting(); + taskManager->saveAndNotify(jobId, taskId, errCode, result); }; TaskContext ctx(req, std::move(cb));