diff --git a/src/common/encryption/LMConnectorStatus.h b/src/common/encryption/LMConnectorStatus.h index 55a8ed94619..df9b9a295e7 100644 --- a/src/common/encryption/LMConnectorStatus.h +++ b/src/common/encryption/LMConnectorStatus.h @@ -80,6 +80,8 @@ enum class LMStatus { ErrParam, ErrInternalServer, OtherError, + // Nebula core implementation + ErrLeaderChange, }; struct LMStatusHelper { @@ -97,7 +99,8 @@ struct LMStatusHelper { {LMStatus::ErrInvalidCiphertext, "ErrInvalidCiphertext"}, {LMStatus::ErrParam, "ErrParam"}, {LMStatus::ErrInternalServer, "ErrInternalServer"}, - {LMStatus::OtherError, "OtherError"}}; + {LMStatus::OtherError, "OtherError"}, + {LMStatus::ErrLeaderChange, "ErrLeaderChange"}}; auto it = statusMap.find(status); if (it != statusMap.end()) { return it->second; diff --git a/src/common/encryption/LicenseManagerConnector.cpp b/src/common/encryption/LicenseManagerConnector.cpp index e914ee86adb..a7cb396835e 100644 --- a/src/common/encryption/LicenseManagerConnector.cpp +++ b/src/common/encryption/LicenseManagerConnector.cpp @@ -26,7 +26,10 @@ LicenseManagerConnector* LicenseManagerConnector::getInstance() { return &instance; } -Status LicenseManagerConnector::init(std::string LmUrl, kvstore::KVStore* kvstore) { +Status LicenseManagerConnector::init(const std::string& LmUrl, + kvstore::KVStore* kvstore, + const HostAddr& localHost) { + localhost_ = localHost; LMUrl_ = LmUrl; kvstore_ = kvstore; bgThread_ = std::make_unique(); @@ -45,10 +48,7 @@ Status LicenseManagerConnector::init(std::string LmUrl, kvstore::KVStore* kvstor if (!ok) { return Status::Error("Failed to start license manager connector thread"); } - - // Delay 1 seconds to wait for heartbeat - bgThread_->addDelayTask( - 1 * 1000 + folly::Random::rand32(900), &LicenseManagerConnector::threadFunc, this); + bgThread_->addTask(&LicenseManagerConnector::threadFunc, this); LOG(INFO) << "[License Manager] initialized successfully, LMId: " << LMId_; return Status::OK(); @@ -85,12 +85,16 @@ LMStatus LicenseManagerConnector::validateLicense() { // Build request auto rawRequest = buildRawRequest(kvstore_); - if (!rawRequest.ok()) { - LOG(ERROR) << "Failed to validate license, error: " << rawRequest.status().toString(); + if (!nebula::ok(rawRequest)) { // error + LOG(ERROR) << "Failed to validate license, error: " + << apache::thrift::util::enumNameSafe(rawRequest.left()); + if (rawRequest.left() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { + return LMStatus::ErrLeaderChange; + } return LMStatus::OtherError; } - auto request = buildValidateRequest(LMId_, productId_, rawRequest.value()); + auto request = buildValidateRequest(LMId_, productId_, rawRequest.right()); if (!request.ok()) { LOG(ERROR) << "Failed to validate license, error: " << request.status().toString(); return LMStatus::OtherError; @@ -113,11 +117,18 @@ void LicenseManagerConnector::threadFunc() { DLOG(INFO) << "[License Manager] connector thread is running"; unsigned int nextCheckPeriod = kDefaultCheckPeriodInSec + folly::Random::rand32(20 * 60); + SCOPE_EXIT { + bgThread_->addDelayTask(nextCheckPeriod * 1000, &LicenseManagerConnector::threadFunc, this); + }; - if (isRetrying_) { - LOG(WARNING) << "[License Manager] retrying to connect to license manager"; + // Check if the current host is the leader + if (!isLeader()) { + VLOG(1) << "[License Manager] current host is not the leader, skip checking"; + return; } + LOG_IF(WARNING, isRetrying_) << "[License Manager] retrying to connect to license manager"; + // validate license iff get LMId successfully auto rse = getLMId(); if (!rse.ok()) { @@ -158,7 +169,8 @@ void LicenseManagerConnector::threadFunc() { case LMStatus::ErrInvalidCiphertext: case LMStatus::ErrParam: case LMStatus::ErrInternalServer: - case LMStatus::OtherError: { + case LMStatus::OtherError: + case LMStatus::ErrLeaderChange: { if (!isLMNormal_) { dropAllHosts_ = true; } @@ -177,7 +189,6 @@ void LicenseManagerConnector::threadFunc() { // TODO(Aiee) for test only, should be removed before official v3.5 release nextCheckPeriod = 10; - bgThread_->addDelayTask(nextCheckPeriod * 1000, &LicenseManagerConnector::threadFunc, this); } StatusOr LicenseManagerConnector::buildValidateRequest(const std::string& lmId, @@ -207,12 +218,12 @@ StatusOr LicenseManagerConnector::buildValidateRequest(const std::s return folly::toJson(request); } -StatusOr LicenseManagerConnector::buildRawRequest(kvstore::KVStore* kvstore) { +ErrorOr LicenseManagerConnector::buildRawRequest( + kvstore::KVStore* kvstore) { // Get active graph hosts auto res = meta::ActiveHostsMan::getHostInfoByRole(kvstore, meta::cpp2::HostRole::GRAPH); if (!nebula::ok(res)) { - return Status::Error("Failed to get active hosts, error: %s", - apache::thrift::util::enumNameSafe(nebula::error(res)).c_str()); + return nebula::error(res); } auto graphNode = nebula::value(res).size(); @@ -221,13 +232,12 @@ StatusOr LicenseManagerConnector::buildRawRequest(kvstore::KVStore* for (auto& host : nebula::value(res)) { graphCPU += host.cpuNum_; } - DLOG(INFO) << "Total graph node number: " << graphNode << ", graph CPU cores: " << graphCPU; + VLOG(2) << "Total graph node number: " << graphNode << ", graph CPU cores: " << graphCPU; // Get active storage hosts res = meta::ActiveHostsMan::getHostInfoByRole(kvstore, meta::cpp2::HostRole::STORAGE); if (!nebula::ok(res)) { - return Status::Error("Failed to get active hosts, error: %s", - apache::thrift::util::enumNameSafe(nebula::error(res)).c_str()); + return nebula::error(res); } auto storageNode = nebula::value(res).size(); @@ -236,6 +246,7 @@ StatusOr LicenseManagerConnector::buildRawRequest(kvstore::KVStore* for (auto& host : nebula::value(res)) { storageCPU += host.cpuNum_; } + VLOG(2) << "Total storage node number: " << storageNode << ", storage CPU cores: " << storageCPU; folly::dynamic request = folly::dynamic::object(); auto timestamp = time::WallClock::fastNowInSec(); @@ -337,7 +348,8 @@ StatusOr LicenseManagerConnector::genAesDecKey(const std::string& l StatusOr LicenseManagerConnector::checkHttpResponse(const HttpResponse& resp) { // check response if (resp.curlCode != CURLE_OK) { - return Status::Error("Failed to connect to license manager"); + return Status::Error("Failed to connect to license manager, http curl code: %s", + curl_easy_strerror(resp.curlCode)); } // parse response @@ -545,4 +557,24 @@ LMStatus LicenseManagerConnector::handleResponseStatus(const std::string& messag return LMStatus::OtherError; } } + +void LicenseManagerConnector::setRetryFlag() { + if (!isRetrying_) { + isRetrying_ = true; + retryStartTime_ = time::WallClock::fastNowInSec(); + } +} + +bool LicenseManagerConnector::isRetryTimeout() { + return time::WallClock::fastNowInSec() - retryStartTime_ > kRetryTimeoutInSec; +} + +bool LicenseManagerConnector::isLeader() { + auto ret = kvstore_->partLeader(nebula::kDefaultSpaceId, nebula::kDefaultPartId); + if (!nebula::ok(ret)) { + LOG(WARNING) << "Part leader get failed"; + return false; + } + return nebula::value(ret) == localhost_; +} } // namespace nebula diff --git a/src/common/encryption/LicenseManagerConnector.h b/src/common/encryption/LicenseManagerConnector.h index af423962a34..427d548bc77 100644 --- a/src/common/encryption/LicenseManagerConnector.h +++ b/src/common/encryption/LicenseManagerConnector.h @@ -43,7 +43,7 @@ class LicenseManagerConnector final { static LicenseManagerConnector* getInstance(); // Init license manager client - Status init(std::string LmUrl, kvstore::KVStore* kvstore); + Status init(const std::string& LmUrl, kvstore::KVStore* kvstore, const HostAddr& localHost); // Get license manager ID // The response format is a json string: @@ -102,7 +102,7 @@ class LicenseManagerConnector final { // storageNode: 10 // } // } - static StatusOr buildRawRequest(kvstore::KVStore*); + static ErrorOr buildRawRequest(kvstore::KVStore*); // Encrypts raw request // The nonce is a random 32-byte value that must never be reused with the same key. @@ -181,18 +181,16 @@ class LicenseManagerConnector final { static LMStatus handleResponseStatus(const std::string& message, bool terminatedFlag = false); // Set retry flag and record the start time - void setRetryFlag() { - if (!isRetrying_) { - isRetrying_ = true; - retryStartTime_ = time::WallClock::fastNowInSec(); - } - } + void setRetryFlag(); - bool isRetryTimeout() { - return time::WallClock::fastNowInSec() - retryStartTime_ > kRetryTimeoutInSec; - } + bool isRetryTimeout(); + + // Checks if the current meta host is the leader + bool isLeader(); private: + // The host address of the meta server + HostAddr localhost_; // The license manager ID std::string LMId_ = ""; // The license manager url diff --git a/src/common/encryption/test/LicenseManagerConnectorTest.cpp b/src/common/encryption/test/LicenseManagerConnectorTest.cpp index d3006793acb..5a190645687 100644 --- a/src/common/encryption/test/LicenseManagerConnectorTest.cpp +++ b/src/common/encryption/test/LicenseManagerConnectorTest.cpp @@ -5,6 +5,7 @@ #include +#include "common/datatypes/HostAddr.h" #include "common/encryption/LicenseManagerConnector.h" #include "common/fs/TempDir.h" #include "mock/MockCluster.h" @@ -122,7 +123,7 @@ int main(int argc, char **argv) { FLAGS_heartbeat_interval_secs = 1; std::unique_ptr kv( nebula::mock::MockCluster::initMetaKV(rootPath.path())); - nebula::LMCIns->init(nebula::LMUrl, kv.get()); + nebula::LMCIns->init(nebula::LMUrl, kv.get(), nebula::HostAddr()); return RUN_ALL_TESTS(); } diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index 7e078d4250f..1a80b5c1ff6 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -194,7 +194,7 @@ int main(int argc, char* argv[]) { // Connect license manager LOG(INFO) << "Connect to license manager, address: " << FLAGS_license_manager_url; LMConnectorIns = LicenseManagerConnector::getInstance(); - LMConnectorIns->init(FLAGS_license_manager_url, gKVStore.get()); + LMConnectorIns->init(FLAGS_license_manager_url, gKVStore.get(), localhost); auto webSvc = std::make_unique(); status = initWebService(webSvc.get(), gKVStore.get()); diff --git a/src/meta/processors/admin/HBProcessor.cpp b/src/meta/processors/admin/HBProcessor.cpp index 087a846d644..77a694e3869 100644 --- a/src/meta/processors/admin/HBProcessor.cpp +++ b/src/meta/processors/admin/HBProcessor.cpp @@ -32,28 +32,18 @@ void HBProcessor::process(const cpp2::HBReq& req) { HostAddr host((*req.host_ref()).host, (*req.host_ref()).port); auto role = req.get_role(); - auto cpuCore = *req.get_cpu_cores(); LOG(INFO) << "Receive heartbeat from " << host << ", role = " << apache::thrift::util::enumNameSafe(role); - auto LMCIns = LicenseManagerConnector::getInstance(); - if (!LMCIns->isMockingForTest()) { - // Check whether meta should accept the heartbeat - if (LMCIns->dropAllHosts_) { - LOG(INFO) << "License is not valid, reject heartbeat from " << host; - handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_LICENSE_MANAGER_STATUS); - onFinished(); - return; - } - - auto ret = checkResourceUsage(role, host, cpuCore); - if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - handleErrorCode(ret); - onFinished(); - return; - } + // Since v3.5.0 enterprise, we add the cpu_cores field in the heartbeat request + if (req.get_cpu_cores() == nullptr) { + LOG(ERROR) << "cpu_cores field is missing in the heartbeat request, check the version of host"; + handleErrorCode(nebula::cpp2::ErrorCode::E_FAIL_TO_CONNECT); + onFinished(); + return; } + auto cpuCore = *req.get_cpu_cores(); std::vector data; if (role == cpp2::HostRole::STORAGE || role == cpp2::HostRole::META_LISTENER || @@ -92,6 +82,25 @@ void HBProcessor::process(const cpp2::HBReq& req) { } } + // Check cluster resource usage + auto LMCIns = LicenseManagerConnector::getInstance(); + if (!LMCIns->isMockingForTest()) { + // Check whether meta should accept the heartbeat + if (LMCIns->dropAllHosts_) { + LOG(INFO) << "License is not valid, reject heartbeat from " << host; + handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_LICENSE_MANAGER_STATUS); + onFinished(); + return; + } + + auto ret = checkResourceUsage(role, host, cpuCore); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + handleErrorCode(ret); + onFinished(); + return; + } + } + // update host info HostInfo info( time::WallClock::fastNowInMilliSec(), role, req.get_git_info_sha(), *req.get_cpu_cores()); @@ -272,6 +281,9 @@ nebula::cpp2::ErrorCode HBProcessor::checkResourceUsage(const cpp2::HostRole rol if (!LMStatusHelper::isStatusNormal(validateRes)) { LOG(ERROR) << "[License Manager] Failed to validate license: " << LMStatusHelper::statusToString(validateRes); + if (validateRes == LMStatus::ErrLeaderChange) { + return nebula::cpp2::ErrorCode::E_LEADER_CHANGED; + } return nebula::cpp2::ErrorCode::E_INVALID_LICENSE_MANAGER_STATUS; }