Skip to content

Commit

Permalink
Handle leader change when checking license in the HB (vesoft-inc#2618)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aiee authored Apr 14, 2023
1 parent febdf69 commit ad25fe2
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 50 deletions.
5 changes: 4 additions & 1 deletion src/common/encryption/LMConnectorStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ enum class LMStatus {
ErrParam,
ErrInternalServer,
OtherError,
// Nebula core implementation
ErrLeaderChange,
};

struct LMStatusHelper {
Expand All @@ -97,7 +99,8 @@ struct LMStatusHelper {
{LMStatus::ErrInvalidCiphertext, "ErrInvalidCiphertext"},
{LMStatus::ErrParam, "ErrParam"},
{LMStatus::ErrInternalServer, "ErrInternalServer"},
{LMStatus::OtherError, "OtherError"}};
{LMStatus::OtherError, "OtherError"},
{LMStatus::ErrLeaderChange, "ErrLeaderChange"}};
auto it = statusMap.find(status);
if (it != statusMap.end()) {
return it->second;
Expand Down
70 changes: 51 additions & 19 deletions src/common/encryption/LicenseManagerConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ LicenseManagerConnector* LicenseManagerConnector::getInstance() {
return &instance;
}

Status LicenseManagerConnector::init(std::string LmUrl, kvstore::KVStore* kvstore) {
Status LicenseManagerConnector::init(const std::string& LmUrl,
kvstore::KVStore* kvstore,
const HostAddr& localHost) {
localhost_ = localHost;
LMUrl_ = LmUrl;
kvstore_ = kvstore;
bgThread_ = std::make_unique<thread::GenericWorker>();
Expand All @@ -45,10 +48,7 @@ Status LicenseManagerConnector::init(std::string LmUrl, kvstore::KVStore* kvstor
if (!ok) {
return Status::Error("Failed to start license manager connector thread");
}

// Delay 1 seconds to wait for heartbeat
bgThread_->addDelayTask(
1 * 1000 + folly::Random::rand32(900), &LicenseManagerConnector::threadFunc, this);
bgThread_->addTask(&LicenseManagerConnector::threadFunc, this);

LOG(INFO) << "[License Manager] initialized successfully, LMId: " << LMId_;
return Status::OK();
Expand Down Expand Up @@ -85,12 +85,16 @@ LMStatus LicenseManagerConnector::validateLicense() {

// Build request
auto rawRequest = buildRawRequest(kvstore_);
if (!rawRequest.ok()) {
LOG(ERROR) << "Failed to validate license, error: " << rawRequest.status().toString();
if (!nebula::ok(rawRequest)) { // error
LOG(ERROR) << "Failed to validate license, error: "
<< apache::thrift::util::enumNameSafe(rawRequest.left());
if (rawRequest.left() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
return LMStatus::ErrLeaderChange;
}
return LMStatus::OtherError;
}

auto request = buildValidateRequest(LMId_, productId_, rawRequest.value());
auto request = buildValidateRequest(LMId_, productId_, rawRequest.right());
if (!request.ok()) {
LOG(ERROR) << "Failed to validate license, error: " << request.status().toString();
return LMStatus::OtherError;
Expand All @@ -113,11 +117,18 @@ void LicenseManagerConnector::threadFunc() {

DLOG(INFO) << "[License Manager] connector thread is running";
unsigned int nextCheckPeriod = kDefaultCheckPeriodInSec + folly::Random::rand32(20 * 60);
SCOPE_EXIT {
bgThread_->addDelayTask(nextCheckPeriod * 1000, &LicenseManagerConnector::threadFunc, this);
};

if (isRetrying_) {
LOG(WARNING) << "[License Manager] retrying to connect to license manager";
// Check if the current host is the leader
if (!isLeader()) {
VLOG(1) << "[License Manager] current host is not the leader, skip checking";
return;
}

LOG_IF(WARNING, isRetrying_) << "[License Manager] retrying to connect to license manager";

// validate license iff get LMId successfully
auto rse = getLMId();
if (!rse.ok()) {
Expand Down Expand Up @@ -158,7 +169,8 @@ void LicenseManagerConnector::threadFunc() {
case LMStatus::ErrInvalidCiphertext:
case LMStatus::ErrParam:
case LMStatus::ErrInternalServer:
case LMStatus::OtherError: {
case LMStatus::OtherError:
case LMStatus::ErrLeaderChange: {
if (!isLMNormal_) {
dropAllHosts_ = true;
}
Expand All @@ -177,7 +189,6 @@ void LicenseManagerConnector::threadFunc() {

// TODO(Aiee) for test only, should be removed before official v3.5 release
nextCheckPeriod = 10;
bgThread_->addDelayTask(nextCheckPeriod * 1000, &LicenseManagerConnector::threadFunc, this);
}

StatusOr<std::string> LicenseManagerConnector::buildValidateRequest(const std::string& lmId,
Expand Down Expand Up @@ -207,12 +218,12 @@ StatusOr<std::string> LicenseManagerConnector::buildValidateRequest(const std::s
return folly::toJson(request);
}

StatusOr<std::string> LicenseManagerConnector::buildRawRequest(kvstore::KVStore* kvstore) {
ErrorOr<nebula::cpp2::ErrorCode, std::string> LicenseManagerConnector::buildRawRequest(
kvstore::KVStore* kvstore) {
// Get active graph hosts
auto res = meta::ActiveHostsMan::getHostInfoByRole(kvstore, meta::cpp2::HostRole::GRAPH);
if (!nebula::ok(res)) {
return Status::Error("Failed to get active hosts, error: %s",
apache::thrift::util::enumNameSafe(nebula::error(res)).c_str());
return nebula::error(res);
}
auto graphNode = nebula::value(res).size();

Expand All @@ -221,13 +232,12 @@ StatusOr<std::string> LicenseManagerConnector::buildRawRequest(kvstore::KVStore*
for (auto& host : nebula::value(res)) {
graphCPU += host.cpuNum_;
}
DLOG(INFO) << "Total graph node number: " << graphNode << ", graph CPU cores: " << graphCPU;
VLOG(2) << "Total graph node number: " << graphNode << ", graph CPU cores: " << graphCPU;

// Get active storage hosts
res = meta::ActiveHostsMan::getHostInfoByRole(kvstore, meta::cpp2::HostRole::STORAGE);
if (!nebula::ok(res)) {
return Status::Error("Failed to get active hosts, error: %s",
apache::thrift::util::enumNameSafe(nebula::error(res)).c_str());
return nebula::error(res);
}
auto storageNode = nebula::value(res).size();

Expand All @@ -236,6 +246,7 @@ StatusOr<std::string> LicenseManagerConnector::buildRawRequest(kvstore::KVStore*
for (auto& host : nebula::value(res)) {
storageCPU += host.cpuNum_;
}
VLOG(2) << "Total storage node number: " << storageNode << ", storage CPU cores: " << storageCPU;

folly::dynamic request = folly::dynamic::object();
auto timestamp = time::WallClock::fastNowInSec();
Expand Down Expand Up @@ -337,7 +348,8 @@ StatusOr<std::string> LicenseManagerConnector::genAesDecKey(const std::string& l
StatusOr<folly::dynamic> LicenseManagerConnector::checkHttpResponse(const HttpResponse& resp) {
// check response
if (resp.curlCode != CURLE_OK) {
return Status::Error("Failed to connect to license manager");
return Status::Error("Failed to connect to license manager, http curl code: %s",
curl_easy_strerror(resp.curlCode));
}

// parse response
Expand Down Expand Up @@ -545,4 +557,24 @@ LMStatus LicenseManagerConnector::handleResponseStatus(const std::string& messag
return LMStatus::OtherError;
}
}

void LicenseManagerConnector::setRetryFlag() {
if (!isRetrying_) {
isRetrying_ = true;
retryStartTime_ = time::WallClock::fastNowInSec();
}
}

bool LicenseManagerConnector::isRetryTimeout() {
return time::WallClock::fastNowInSec() - retryStartTime_ > kRetryTimeoutInSec;
}

bool LicenseManagerConnector::isLeader() {
auto ret = kvstore_->partLeader(nebula::kDefaultSpaceId, nebula::kDefaultPartId);
if (!nebula::ok(ret)) {
LOG(WARNING) << "Part leader get failed";
return false;
}
return nebula::value(ret) == localhost_;
}
} // namespace nebula
20 changes: 9 additions & 11 deletions src/common/encryption/LicenseManagerConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class LicenseManagerConnector final {
static LicenseManagerConnector* getInstance();

// Init license manager client
Status init(std::string LmUrl, kvstore::KVStore* kvstore);
Status init(const std::string& LmUrl, kvstore::KVStore* kvstore, const HostAddr& localHost);

// Get license manager ID
// The response format is a json string:
Expand Down Expand Up @@ -102,7 +102,7 @@ class LicenseManagerConnector final {
// storageNode: 10
// }
// }
static StatusOr<std::string> buildRawRequest(kvstore::KVStore*);
static ErrorOr<nebula::cpp2::ErrorCode, std::string> buildRawRequest(kvstore::KVStore*);

// Encrypts raw request
// The nonce is a random 32-byte value that must never be reused with the same key.
Expand Down Expand Up @@ -181,18 +181,16 @@ class LicenseManagerConnector final {
static LMStatus handleResponseStatus(const std::string& message, bool terminatedFlag = false);

// Set retry flag and record the start time
void setRetryFlag() {
if (!isRetrying_) {
isRetrying_ = true;
retryStartTime_ = time::WallClock::fastNowInSec();
}
}
void setRetryFlag();

bool isRetryTimeout() {
return time::WallClock::fastNowInSec() - retryStartTime_ > kRetryTimeoutInSec;
}
bool isRetryTimeout();

// Checks if the current meta host is the leader
bool isLeader();

private:
// The host address of the meta server
HostAddr localhost_;
// The license manager ID
std::string LMId_ = "";
// The license manager url
Expand Down
3 changes: 2 additions & 1 deletion src/common/encryption/test/LicenseManagerConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <gtest/gtest.h>

#include "common/datatypes/HostAddr.h"
#include "common/encryption/LicenseManagerConnector.h"
#include "common/fs/TempDir.h"
#include "mock/MockCluster.h"
Expand Down Expand Up @@ -122,7 +123,7 @@ int main(int argc, char **argv) {
FLAGS_heartbeat_interval_secs = 1;
std::unique_ptr<nebula::kvstore::KVStore> kv(
nebula::mock::MockCluster::initMetaKV(rootPath.path()));
nebula::LMCIns->init(nebula::LMUrl, kv.get());
nebula::LMCIns->init(nebula::LMUrl, kv.get(), nebula::HostAddr());

return RUN_ALL_TESTS();
}
2 changes: 1 addition & 1 deletion src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ int main(int argc, char* argv[]) {
// Connect license manager
LOG(INFO) << "Connect to license manager, address: " << FLAGS_license_manager_url;
LMConnectorIns = LicenseManagerConnector::getInstance();
LMConnectorIns->init(FLAGS_license_manager_url, gKVStore.get());
LMConnectorIns->init(FLAGS_license_manager_url, gKVStore.get(), localhost);

auto webSvc = std::make_unique<nebula::WebService>();
status = initWebService(webSvc.get(), gKVStore.get());
Expand Down
46 changes: 29 additions & 17 deletions src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,18 @@ void HBProcessor::process(const cpp2::HBReq& req) {

HostAddr host((*req.host_ref()).host, (*req.host_ref()).port);
auto role = req.get_role();
auto cpuCore = *req.get_cpu_cores();

LOG(INFO) << "Receive heartbeat from " << host
<< ", role = " << apache::thrift::util::enumNameSafe(role);

auto LMCIns = LicenseManagerConnector::getInstance();
if (!LMCIns->isMockingForTest()) {
// Check whether meta should accept the heartbeat
if (LMCIns->dropAllHosts_) {
LOG(INFO) << "License is not valid, reject heartbeat from " << host;
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_LICENSE_MANAGER_STATUS);
onFinished();
return;
}

auto ret = checkResourceUsage(role, host, cpuCore);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(ret);
onFinished();
return;
}
// Since v3.5.0 enterprise, we add the cpu_cores field in the heartbeat request
if (req.get_cpu_cores() == nullptr) {
LOG(ERROR) << "cpu_cores field is missing in the heartbeat request, check the version of host";
handleErrorCode(nebula::cpp2::ErrorCode::E_FAIL_TO_CONNECT);
onFinished();
return;
}
auto cpuCore = *req.get_cpu_cores();

std::vector<kvstore::KV> data;
if (role == cpp2::HostRole::STORAGE || role == cpp2::HostRole::META_LISTENER ||
Expand Down Expand Up @@ -92,6 +82,25 @@ void HBProcessor::process(const cpp2::HBReq& req) {
}
}

// Check cluster resource usage
auto LMCIns = LicenseManagerConnector::getInstance();
if (!LMCIns->isMockingForTest()) {
// Check whether meta should accept the heartbeat
if (LMCIns->dropAllHosts_) {
LOG(INFO) << "License is not valid, reject heartbeat from " << host;
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_LICENSE_MANAGER_STATUS);
onFinished();
return;
}

auto ret = checkResourceUsage(role, host, cpuCore);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(ret);
onFinished();
return;
}
}

// update host info
HostInfo info(
time::WallClock::fastNowInMilliSec(), role, req.get_git_info_sha(), *req.get_cpu_cores());
Expand Down Expand Up @@ -272,6 +281,9 @@ nebula::cpp2::ErrorCode HBProcessor::checkResourceUsage(const cpp2::HostRole rol
if (!LMStatusHelper::isStatusNormal(validateRes)) {
LOG(ERROR) << "[License Manager] Failed to validate license: "
<< LMStatusHelper::statusToString(validateRes);
if (validateRes == LMStatus::ErrLeaderChange) {
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return nebula::cpp2::ErrorCode::E_INVALID_LICENSE_MANAGER_STATUS;
}

Expand Down

0 comments on commit ad25fe2

Please sign in to comment.