Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heartbeat processor #343

Merged
merged 5 commits into from
May 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ enum ErrorCode {
E_NO_HOSTS = -21,
E_EXISTED = -22,
E_NOT_FOUND = -23,
E_INVALID_HOST = -24,

// KV Failure
E_STORE_FAILURE = -31,
Expand Down Expand Up @@ -288,6 +289,15 @@ struct ScanResp {
2: list<string> values,
}

struct HBResp {
1: ErrorCode code,
2: common.HostAddr leader,
}

struct HBReq {
1: common.HostAddr host,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timestamp ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when a storage node offline, could get the final reporting time.

Or recorded it in logs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have recorded its last hb-timestamp

}

service MetaService {
ExecResp createSpace(1: CreateSpaceReq req);
ExecResp dropSpace(1: DropSpaceReq req);
Expand Down Expand Up @@ -317,5 +327,7 @@ service MetaService {
RemoveResp remove(1: RemoveReq req);
RemoveRangeResp removeRange(1: RemoveRangeReq req);
ScanResp scan(1: ScanReq req);

HBResp heartBeat(1: HBReq req);
}

14 changes: 7 additions & 7 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ ResultCode RocksEngine::get(const std::string& key, std::string* value) {
if (status.ok()) {
return ResultCode::SUCCEEDED;
} else if (status.IsNotFound()) {
LOG(ERROR) << "Get: " << key << " Not Found";
VLOG(3) << "Get: " << key << " Not Found";
return ResultCode::ERR_KEY_NOT_FOUND;
} else {
LOG(ERROR) << "Get Failed: " << key << " " << status.ToString();
VLOG(3) << "Get Failed: " << key << " " << status.ToString();
return ResultCode::ERR_UNKNOWN;
}
}
Expand Down Expand Up @@ -91,7 +91,7 @@ ResultCode RocksEngine::put(std::string key, std::string value) {
if (status.ok()) {
return ResultCode::SUCCEEDED;
} else {
LOG(ERROR) << "Put Failed: " << key << status.ToString();
VLOG(3) << "Put Failed: " << key << status.ToString();
return ResultCode::ERR_UNKNOWN;
}
}
Expand All @@ -108,7 +108,7 @@ ResultCode RocksEngine::multiPut(std::vector<KV> keyValues) {
if (status.ok()) {
return ResultCode::SUCCEEDED;
} else {
LOG(ERROR) << "MultiPut Failed: " << status.ToString();
VLOG(3) << "MultiPut Failed: " << status.ToString();
return ResultCode::ERR_UNKNOWN;
}
}
Expand Down Expand Up @@ -146,7 +146,7 @@ ResultCode RocksEngine::remove(const std::string& key) {
if (status.ok()) {
return ResultCode::SUCCEEDED;
} else {
LOG(ERROR) << "Remove Failed: " << key << status.ToString();
VLOG(3) << "Remove Failed: " << key << status.ToString();
return ResultCode::ERR_UNKNOWN;
}
}
Expand All @@ -162,7 +162,7 @@ ResultCode RocksEngine::multiRemove(std::vector<std::string> keys) {
if (status.ok()) {
return ResultCode::SUCCEEDED;
} else {
LOG(ERROR) << "MultiRemove Failed: " << status.ToString();
VLOG(3) << "MultiRemove Failed: " << status.ToString();
return ResultCode::ERR_UNKNOWN;
}
}
Expand All @@ -176,7 +176,7 @@ ResultCode RocksEngine::removeRange(const std::string& start,
if (status.ok()) {
return ResultCode::SUCCEEDED;
} else {
LOG(ERROR) << "RemoveRange Failed: " << status.ToString();
VLOG(3) << "RemoveRange Failed: " << status.ToString();
return ResultCode::ERR_UNKNOWN;
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class ActiveHostsMan final {
ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds)
: intervalSeconds_(intervalSeconds)
, expirationInSeconds_(expiredSeconds) {
CHECK_GT(intervalSeconds, 0)
<< "intervalSeconds " << intervalSeconds << " should > 0!";
CHECK_GE(expiredSeconds, intervalSeconds)
<< "expiredSeconds " << expiredSeconds
<< " should >= intervalSeconds " << intervalSeconds;
CHECK(checkThread_.start());
checkThread_.addTimerTask(intervalSeconds * 1000,
intervalSeconds * 1000,
Expand All @@ -50,7 +55,7 @@ class ActiveHostsMan final {
checkThread_.wait();
}

void updateHostInfo(HostAddr hostAddr, HostInfo info) {
void updateHostInfo(const HostAddr& hostAddr, const HostInfo& info) {
folly::RWSpinLock::ReadHolder rh(&lock_);
auto it = hostsMap_.find(hostAddr);
if (it == hostsMap_.end()) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ add_library(
processors/RemoveProcessor.cpp
processors/RemoveRangeProcessor.cpp
processors/ScanProcessor.cpp
processors/HBProcessor.cpp
)

add_dependencies(
Expand Down
7 changes: 7 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "meta/processors/RemoveProcessor.h"
#include "meta/processors/RemoveRangeProcessor.h"
#include "meta/processors/GetPartsAllocProcessor.h"
#include "meta/processors/HBProcessor.h"

#define RETURN_FUTURE(processor) \
auto f = processor->getFuture(); \
Expand Down Expand Up @@ -155,5 +156,11 @@ MetaServiceHandler::future_listEdges(const cpp2::ListEdgesReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::HBResp>
MetaServiceHandler::future_heartBeat(const cpp2::HBReq& req) {
auto* processor = HBProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

} // namespace meta
} // namespace nebula
6 changes: 6 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListEdgesResp>
future_listEdges(const cpp2::ListEdgesReq& req) override;

/**
* HeartBeat
* */
folly::Future<cpp2::HBResp>
future_heartBeat(const cpp2::HBReq& req) override;

private:
kvstore::KVStore* kvstore_ = nullptr;
};
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ class BaseProcessor {
Status spaceExist(GraphSpaceID spaceId);

/**
* Check multi host_name exists or not.
* Check host has been registered or not.
* */
Status hostsExist(const std::vector<std::string>& name);
Status hostExist(const std::string& hostKey);

/**
* Return the spaceId for name.
Expand Down
24 changes: 6 additions & 18 deletions src/meta/processors/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,13 @@ Status BaseProcessor<RESP>::spaceExist(GraphSpaceID spaceId) {
}

template<typename RESP>
Status BaseProcessor<RESP>::hostsExist(const std::vector<std::string> &hostsKey) {
for (const auto& hostKey : hostsKey) {
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, hostKey , &val);
if (ret != kvstore::ResultCode::SUCCEEDED) {
if (ret == kvstore::ResultCode::ERR_KEY_NOT_FOUND) {
nebula::cpp2::HostAddr host = MetaServiceUtils::parseHostKey(hostKey);
std::string ip = NetworkUtils::intToIPv4(host.get_ip());
int32_t port = host.get_port();
VLOG(3) << "Error, host IP " << ip << " port " << port
<< " not exist";
return Status::HostNotFound();
} else {
VLOG(3) << "Unknown Error ,, ret = " << static_cast<int32_t>(ret);
return Status::Error("Unknown error!");
}
}
Status BaseProcessor<RESP>::hostExist(const std::string& hostKey) {
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, hostKey , &val);
if (ret == kvstore::ResultCode::SUCCEEDED) {
return Status::OK();
}
return Status::OK();
return Status::HostNotFound();
}

template<typename RESP>
Expand Down
21 changes: 21 additions & 0 deletions src/meta/processors/HBProcessor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/


#include "meta/processors/HBProcessor.h"

DEFINE_int32(expired_hosts_check_interval_sec, 20,
"Check the expired hosts at the interval");
DEFINE_int32(expired_threshold_sec, 10 * 60,
"Hosts will be expired in this time if no heartbeat received");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a parameter check be better? for example :
static bool checkHBFlags(const char* fn, int32_t th) {
   if (FLAGS_expired_hosts_check_interval_sec > th) 
     return false;
   return true;
}
static const bool invalid = RegisterFlagValidator(&FLAGS_expired_threshold_sec, &checkHBFlags);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

namespace nebula {
namespace meta {


} // namespace meta
} // namespace nebula

61 changes: 61 additions & 0 deletions src/meta/processors/HBProcessor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/

#ifndef META_HBPROCESSOR_H_
#define META_HBPROCESSOR_H_

#include <gtest/gtest_prod.h>
#include "meta/processors/BaseProcessor.h"
#include "meta/ActiveHostsMan.h"
#include "time/TimeUtils.h"

DECLARE_int32(expired_hosts_check_interval_sec);
DECLARE_int32(expired_threshold_sec);

namespace nebula {
namespace meta {

class HBProcessor : public BaseProcessor<cpp2::HBResp> {
FRIEND_TEST(HBProcessorTest, HBTest);

public:
static HBProcessor* instance(kvstore::KVStore* kvstore) {
return new HBProcessor(kvstore);
}

void process(const cpp2::HBReq& req) {
HostAddr host(req.host.ip, req.host.port);
if (hostExist(MetaServiceUtils::hostKey(host.first, host.second))
== Status::HostNotFound()) {
LOG(INFO) << "Reject unregistered host " << host << "!";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a interface to handler the situation uniformly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Output HostAddr to stream has been supported.

resp_.set_code(cpp2::ErrorCode::E_INVALID_HOST);
onFinished();
return;
}

LOG(INFO) << "Receive heartbeat from " << host;
HostInfo info;
info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds();
hostsMan()->updateHostInfo(host, info);
onFinished();
}

private:
explicit HBProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::HBResp>(kvstore) {}

static ActiveHostsMan* hostsMan() {
static auto hostsMan
= std::make_unique<ActiveHostsMan>(FLAGS_expired_hosts_check_interval_sec,
FLAGS_expired_threshold_sec);
return hostsMan.get();
}
};

} // namespace meta
} // namespace nebula

#endif // META_HBPROCESSOR_H_
20 changes: 9 additions & 11 deletions src/meta/processors/RemoveHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,18 @@ namespace meta {

void RemoveHostsProcessor::process(const cpp2::RemoveHostsReq& req) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());

std::vector<std::string> hostsKey;
for (auto& h : req.get_hosts()) {
hostsKey.emplace_back(MetaServiceUtils::hostKey(h.ip, h.port));
}

auto hostsRet = hostsExist(hostsKey);
if (!hostsRet.ok()) {
resp_.set_code(to(std::move(hostsRet)));
onFinished();
return;
auto hostKey = MetaServiceUtils::hostKey(h.ip, h.port);
auto ret = hostExist(hostKey);
if (!ret.ok()) {
LOG(WARNING) << "The host " << HostAddr(h.ip, h.port) << " not existed!";
resp_.set_code(to(ret));
onFinished();
return;
}
hostsKey.emplace_back(std::move(hostKey));
}

LOG(INFO) << "Remove hosts ";
resp_.set_code(cpp2::ErrorCode::SUCCEEDED);
doMultiRemove(std::move(hostsKey));
}
Expand Down
26 changes: 26 additions & 0 deletions src/meta/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,32 @@ nebula_link_libraries(
)
nebula_add_test(processor_test)

add_executable(
hb_processor_test
HBProcessorTest.cpp
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:kvstore_obj>
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:network_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:schema_obj>
)
nebula_link_libraries(
hb_processor_test
proxygenhttpserver
proxygenlib
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
wangle
gtest
)
nebula_add_test(hb_processor_test)

add_executable(
meta_client_test
MetaClientTest.cpp
Expand Down
Loading