Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix storage hang on terminate #3014

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/nebula/GeneralCompilerConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ include_directories(AFTER ${CMAKE_CURRENT_BINARY_DIR}/src)

if(ENABLE_WERROR)
add_compile_options(-Werror)
add_compile_options(-Wno-attributes)
Copy link
Contributor

@darionyaphet darionyaphet Oct 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why support this compile flag? which problem dose it solved?

endif()

if(NOT ENABLE_STRICT_ALIASING)
Expand Down
64 changes: 48 additions & 16 deletions src/storage/admin/AdminTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ bool AdminTaskManager::init() {
return false;
}

shutdown_.store(false, std::memory_order_release);
bgThread_->addTask(&AdminTaskManager::schedule, this);
shutdown_ = false;
ifAnyUnreported_ = true;
handleUnreportedTasks();
LOG(INFO) << "exit AdminTaskManager::init()";
return true;
Expand All @@ -41,20 +42,29 @@ void AdminTaskManager::handleUnreportedTasks() {
std::tuple<JobID, TaskID, std::string, folly::Future<StatusOr<nebula::cpp2::ErrorCode>>>;
if (env_ == nullptr) return;
unreportedAdminThread_.reset(new std::thread([this] {
bool ifAny = true;
while (true) {
std::unique_lock<std::mutex> lk(unreportedMutex_);
if (!ifAny) unreportedCV_.wait(lk);
ifAny = false;
if (!ifAnyUnreported_) {
unreportedCV_.wait(lk);
}
if (shutdown_.load(std::memory_order_acquire)) {
break;
}
kikimo marked this conversation as resolved.
Show resolved Hide resolved
ifAnyUnreported_ = false;
std::unique_ptr<kvstore::KVIterator> iter;
auto kvRet = env_->adminStore_->scan(&iter);
if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED || iter == nullptr) continue;
lk.unlock();
if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED || iter == nullptr) {
continue;
}
std::vector<std::string> keys;
std::vector<futTuple> futVec;
for (; iter->valid(); iter->next()) {
folly::StringPiece key = iter->key();
int32_t seqId = *reinterpret_cast<const int32_t*>(key.data());
if (seqId < 0) continue;
if (seqId < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the key is consists of several items. would check it cast to integer?

continue;
}
JobID jobId = *reinterpret_cast<const JobID*>(key.data() + sizeof(int32_t));
TaskID taskId =
*reinterpret_cast<const TaskID*>(key.data() + sizeof(int32_t) + sizeof(JobID));
Expand All @@ -67,14 +77,17 @@ void AdminTaskManager::handleUnreportedTasks() {
nebula::cpp2::ErrorCode errCode =
*reinterpret_cast<const nebula::cpp2::ErrorCode*>(val.data());
meta::cpp2::StatsItem* pStats = nullptr;
if (errCode == nebula::cpp2::ErrorCode::SUCCEEDED) pStats = &statsItem;
if (errCode == nebula::cpp2::ErrorCode::SUCCEEDED) {
pStats = &statsItem;
}
LOG(INFO) << folly::sformat("reportTaskFinish(), job={}, task={}, rc={}",
jobId,
taskId,
apache::thrift::util::enumNameSafe(errCode));
if (seqId < env_->adminSeqId_) {
if (jobStatus == nebula::meta::cpp2::JobStatus::RUNNING && pStats != nullptr)
if (jobStatus == nebula::meta::cpp2::JobStatus::RUNNING && pStats != nullptr) {
pStats->set_status(nebula::meta::cpp2::JobStatus::FAILED);
}
auto fut = env_->metaClient_->reportTaskFinish(jobId, taskId, errCode, pStats);
futVec.emplace_back(std::move(jobId), std::move(taskId), std::move(key), std::move(fut));
} else if (jobStatus != nebula::meta::cpp2::JobStatus::RUNNING) {
Expand All @@ -92,15 +105,15 @@ void AdminTaskManager::handleUnreportedTasks() {
if (!fut.hasValue()) {
LOG(INFO) << folly::sformat(
"reportTaskFinish() got rpc error:, job={}, task={}", jobId, taskId);
ifAny = true;
ifAnyUnreported_ = true;
continue;
}
if (!fut.value().ok()) {
LOG(INFO) << folly::sformat("reportTaskFinish() has bad status:, job={}, task={}, rc={}",
jobId,
taskId,
fut.value().status().toString());
ifAny = true;
ifAnyUnreported_ = true;
continue;
}
rc = fut.value().value();
Expand All @@ -110,7 +123,7 @@ void AdminTaskManager::handleUnreportedTasks() {
apache::thrift::util::enumNameSafe(rc));
if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED ||
rc == nebula::cpp2::ErrorCode::E_STORE_FAILURE) {
ifAny = true;
ifAnyUnreported_ = true;
continue;
} else {
keys.emplace_back(key.data(), key.size());
Expand All @@ -119,6 +132,7 @@ void AdminTaskManager::handleUnreportedTasks() {
}
env_->adminStore_->multiRemove(keys);
}
LOG(INFO) << "Unreported-Admin-Thread stopped";
}));
}

Expand Down Expand Up @@ -168,7 +182,8 @@ nebula::cpp2::ErrorCode AdminTaskManager::cancelTask(JobID jobId, TaskID taskId)

void AdminTaskManager::shutdown() {
LOG(INFO) << "enter AdminTaskManager::shutdown()";
shutdown_ = true;
shutdown_.store(true, std::memory_order_release);
notifyReporting();
bgThread_->stop();
bgThread_->wait();

Expand All @@ -177,6 +192,9 @@ void AdminTaskManager::shutdown() {
}

pool_->join();
if (unreportedAdminThread_ != nullptr) {
unreportedAdminThread_->join();
}
LOG(INFO) << "exit AdminTaskManager::shutdown()";
}

Expand Down Expand Up @@ -207,14 +225,14 @@ void AdminTaskManager::removeTaskStatus(JobID jobId, TaskID taskId) {
// schedule
void AdminTaskManager::schedule() {
std::chrono::milliseconds interval{20}; // 20ms
while (!shutdown_) {
while (!shutdown_.load(std::memory_order_acquire)) {
LOG(INFO) << "waiting for incoming task";
folly::Optional<TaskHandle> optTaskHandle{folly::none};
while (!optTaskHandle && !shutdown_) {
while (!optTaskHandle && !shutdown_.load(std::memory_order_acquire)) {
optTaskHandle = taskQueue_.try_take_for(interval);
}

if (shutdown_) {
if (shutdown_.load(std::memory_order_acquire)) {
LOG(INFO) << "detect AdminTaskManager::shutdown()";
return;
}
Expand Down Expand Up @@ -308,7 +326,21 @@ void AdminTaskManager::runSubTask(TaskHandle handle) {
}
}

void AdminTaskManager::notifyReporting() { unreportedCV_.notify_one(); }
void AdminTaskManager::notifyReporting() {
std::unique_lock<std::mutex> lk(unreportedMutex_);
ifAnyUnreported_ = true;
unreportedCV_.notify_one();
}

void AdminTaskManager::saveAndNotify(JobID jobId,
TaskID taskId,
nebula::cpp2::ErrorCode rc,
const nebula::meta::cpp2::StatsItem& result) {
std::unique_lock<std::mutex> lk(unreportedMutex_);
saveTaskStatus(jobId, taskId, rc, result);
ifAnyUnreported_ = true;
unreportedCV_.notify_one();
}

bool AdminTaskManager::isFinished(JobID jobID, TaskID taskID) {
auto iter = tasks_.find(std::make_pair(jobID, taskID));
Expand Down
8 changes: 7 additions & 1 deletion src/storage/admin/AdminTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,17 @@ class AdminTaskManager {

void notifyReporting();

void saveAndNotify(JobID jobId,
TaskID taskId,
nebula::cpp2::ErrorCode rc,
const nebula::meta::cpp2::StatsItem& result);

private:
void schedule();
void runSubTask(TaskHandle handle);

private:
bool shutdown_{false};
std::atomic<bool> shutdown_{false};
std::unique_ptr<ThreadPool> pool_{nullptr};
TaskContainer tasks_;
TaskQueue taskQueue_;
Expand All @@ -77,6 +82,7 @@ class AdminTaskManager {
std::unique_ptr<std::thread> unreportedAdminThread_;
std::mutex unreportedMutex_;
std::condition_variable unreportedCV_;
bool ifAnyUnreported_{true};
};

} // namespace storage
Expand Down
3 changes: 1 addition & 2 deletions src/storage/admin/AdminTaskProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ void AdminTaskProcessor::process(const cpp2::AddAdminTaskRequest& req) {

auto cb = [taskManager, jobId = req.get_job_id(), taskId = req.get_task_id()](
nebula::cpp2::ErrorCode errCode, nebula::meta::cpp2::StatsItem& result) {
taskManager->saveTaskStatus(jobId, taskId, errCode, result);
taskManager->notifyReporting();
taskManager->saveAndNotify(jobId, taskId, errCode, result);
};

TaskContext ctx(req, std::move(cb));
Expand Down