From 1b512e48f32e44164546a669c16ea3893e3b12bf Mon Sep 17 00:00:00 2001 From: heng Date: Tue, 7 May 2019 16:01:52 +0800 Subject: [PATCH 1/3] x --- src/interface/meta.thrift | 11 ++++++++++ src/meta/MetaServiceHandler.cpp | 7 +++++++ src/meta/MetaServiceHandler.h | 6 ++++++ src/meta/processors/HBProcessor.cpp | 16 +++++++++++++++ src/meta/processors/HBProcessor.h | 31 +++++++++++++++++++++++++++++ 5 files changed, 71 insertions(+) create mode 100644 src/meta/processors/HBProcessor.cpp create mode 100644 src/meta/processors/HBProcessor.h diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index df14c05d49a..5672724113d 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -281,6 +281,15 @@ struct ScanResp { 2: list values, } +struct HBResp { + 1: ErrorCode code, + 2: common.HostAddr leader, +} + +struct HBReq { + 1: common.HostAddr host, +} + service MetaService { ExecResp createSpace(1: CreateSpaceReq req); ExecResp dropSpace(1: DropSpaceReq req); @@ -310,5 +319,7 @@ service MetaService { RemoveResp remove(1: RemoveReq req); RemoveRangeResp removeRange(1: RemoveRangeReq req); ScanResp scan(1: ScanReq req); + + HBResp heartBeat(1: HBReq req); } diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index 59b404044e9..56fdd1a7184 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -26,6 +26,7 @@ #include "meta/processors/RemoveProcessor.h" #include "meta/processors/RemoveRangeProcessor.h" #include "meta/processors/GetPartsAllocProcessor.h" +#include "meta/processors/HBProcessor.h" #define RETURN_FUTURE(processor) \ auto f = processor->getFuture(); \ @@ -155,5 +156,11 @@ MetaServiceHandler::future_listEdges(const cpp2::ListEdgesReq& req) { RETURN_FUTURE(processor); } +folly::Future +MetaServiceHandler::future_heartBeat(const cpp2::HBReq& req) { + auto* processor = HBProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index 37c9686a25a..fdff1ecb6c7 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -89,6 +89,12 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_listEdges(const cpp2::ListEdgesReq& req) override; + /** + * HeartBeat + * */ + folly::Future + future_heartBeat(const cpp2::HBReq& req) override; + private: kvstore::KVStore* kvstore_ = nullptr; }; diff --git a/src/meta/processors/HBProcessor.cpp b/src/meta/processors/HBProcessor.cpp new file mode 100644 index 00000000000..3455e471cde --- /dev/null +++ b/src/meta/processors/HBProcessor.cpp @@ -0,0 +1,16 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + + +#include "meta/processors/HBProcessor.h" + +namespace nebula { +namespace meta { + + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/HBProcessor.h b/src/meta/processors/HBProcessor.h new file mode 100644 index 00000000000..dc7e922569d --- /dev/null +++ b/src/meta/processors/HBProcessor.h @@ -0,0 +1,31 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_HBPROCESSOR_H_ +#define META_HBPROCESSOR_H_ + +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { + +class HBProcessor : public BaseProcessor { +public: + static HBProcessor* instance(kvstore::KVStore* kvstore) { + return new HBProcessor(kvstore); + } + + void process(const cpp2::HBReq& req); + +private: + explicit HBProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; + +} // namespace meta +} // namespace nebula + +#endif // META_HBPROCESSOR_H_ From 1534172e53e42393f79f4992200b5d53e84ae612 Mon Sep 17 00:00:00 2001 From: heng Date: Wed, 8 May 2019 20:31:43 +0800 Subject: [PATCH 2/3] Add heartbeat processor --- src/interface/meta.thrift | 1 + src/kvstore/RocksEngine.cpp | 14 +-- src/meta/ActiveHostsMan.h | 2 +- src/meta/CMakeLists.txt | 1 + src/meta/processors/BaseProcessor.h | 4 +- src/meta/processors/BaseProcessor.inl | 24 ++--- src/meta/processors/HBProcessor.cpp | 5 ++ src/meta/processors/HBProcessor.h | 34 ++++++- src/meta/processors/RemoveHostsProcessor.cpp | 20 ++--- src/meta/test/CMakeLists.txt | 26 ++++++ src/meta/test/HBProcessorTest.cpp | 93 ++++++++++++++++++++ 11 files changed, 183 insertions(+), 41 deletions(-) create mode 100644 src/meta/test/HBProcessorTest.cpp diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 5672724113d..e848511079c 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -24,6 +24,7 @@ enum ErrorCode { E_NO_HOSTS = -21, E_EXISTED = -22, E_NOT_FOUND = -23, + E_INVALID_HOST = -24, // KV Failure E_STORE_FAILURE = -31, diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index 373ec370e93..3aca8a0c711 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -56,10 +56,10 @@ ResultCode RocksEngine::get(const std::string& key, std::string* value) { if (status.ok()) { return ResultCode::SUCCEEDED; } else if (status.IsNotFound()) { - LOG(ERROR) << "Get: " << key << " Not Found"; + VLOG(3) << "Get: " << key << " Not Found"; return ResultCode::ERR_KEY_NOT_FOUND; } else { - LOG(ERROR) << "Get Failed: " << key << " " << status.ToString(); + VLOG(3) << "Get Failed: " << key << " " << status.ToString(); return ResultCode::ERR_UNKNOWN; } } @@ -91,7 +91,7 @@ ResultCode RocksEngine::put(std::string key, std::string value) { if (status.ok()) { return ResultCode::SUCCEEDED; } else { - LOG(ERROR) << "Put Failed: " << key << status.ToString(); + VLOG(3) << "Put Failed: " << key << status.ToString(); return ResultCode::ERR_UNKNOWN; } } @@ -108,7 +108,7 @@ ResultCode RocksEngine::multiPut(std::vector keyValues) { if (status.ok()) { return ResultCode::SUCCEEDED; } else { - LOG(ERROR) << "MultiPut Failed: " << status.ToString(); + VLOG(3) << "MultiPut Failed: " << status.ToString(); return ResultCode::ERR_UNKNOWN; } } @@ -146,7 +146,7 @@ ResultCode RocksEngine::remove(const std::string& key) { if (status.ok()) { return ResultCode::SUCCEEDED; } else { - LOG(ERROR) << "Remove Failed: " << key << status.ToString(); + VLOG(3) << "Remove Failed: " << key << status.ToString(); return ResultCode::ERR_UNKNOWN; } } @@ -162,7 +162,7 @@ ResultCode RocksEngine::multiRemove(std::vector keys) { if (status.ok()) { return ResultCode::SUCCEEDED; } else { - LOG(ERROR) << "MultiRemove Failed: " << status.ToString(); + VLOG(3) << "MultiRemove Failed: " << status.ToString(); return ResultCode::ERR_UNKNOWN; } } @@ -176,7 +176,7 @@ ResultCode RocksEngine::removeRange(const std::string& start, if (status.ok()) { return ResultCode::SUCCEEDED; } else { - LOG(ERROR) << "RemoveRange Failed: " << status.ToString(); + VLOG(3) << "RemoveRange Failed: " << status.ToString(); return ResultCode::ERR_UNKNOWN; } } diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index e2a83559e97..8b265e697ee 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -50,7 +50,7 @@ class ActiveHostsMan final { checkThread_.wait(); } - void updateHostInfo(HostAddr hostAddr, HostInfo info) { + void updateHostInfo(const HostAddr& hostAddr, const HostInfo& info) { folly::RWSpinLock::ReadHolder rh(&lock_); auto it = hostsMap_.find(hostAddr); if (it == hostsMap_.end()) { diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 07fa18448e1..4aaa8ed21ea 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -33,6 +33,7 @@ add_library( processors/RemoveProcessor.cpp processors/RemoveRangeProcessor.cpp processors/ScanProcessor.cpp + processors/HBProcessor.cpp ) add_dependencies( diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index e9532172b44..db4b177abd9 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -166,9 +166,9 @@ class BaseProcessor { Status spaceExist(GraphSpaceID spaceId); /** - * Check multi host_name exists or not. + * Check host has been registered or not. * */ - Status hostsExist(const std::vector& name); + Status hostExist(const std::string& hostKey); /** * Return the spaceId for name. diff --git a/src/meta/processors/BaseProcessor.inl b/src/meta/processors/BaseProcessor.inl index ceb07999f13..1593a1652a2 100644 --- a/src/meta/processors/BaseProcessor.inl +++ b/src/meta/processors/BaseProcessor.inl @@ -152,25 +152,13 @@ Status BaseProcessor::spaceExist(GraphSpaceID spaceId) { } template -Status BaseProcessor::hostsExist(const std::vector &hostsKey) { - for (const auto& hostKey : hostsKey) { - std::string val; - auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, hostKey , &val); - if (ret != kvstore::ResultCode::SUCCEEDED) { - if (ret == kvstore::ResultCode::ERR_KEY_NOT_FOUND) { - nebula::cpp2::HostAddr host = MetaServiceUtils::parseHostKey(hostKey); - std::string ip = NetworkUtils::intToIPv4(host.get_ip()); - int32_t port = host.get_port(); - VLOG(3) << "Error, host IP " << ip << " port " << port - << " not exist"; - return Status::HostNotFound(); - } else { - VLOG(3) << "Unknown Error ,, ret = " << static_cast(ret); - return Status::Error("Unknown error!"); - } - } +Status BaseProcessor::hostExist(const std::string& hostKey) { + std::string val; + auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, hostKey , &val); + if (ret == kvstore::ResultCode::SUCCEEDED) { + return Status::OK(); } - return Status::OK(); + return Status::HostNotFound(); } template diff --git a/src/meta/processors/HBProcessor.cpp b/src/meta/processors/HBProcessor.cpp index 3455e471cde..61edb9f9630 100644 --- a/src/meta/processors/HBProcessor.cpp +++ b/src/meta/processors/HBProcessor.cpp @@ -7,6 +7,11 @@ #include "meta/processors/HBProcessor.h" +DEFINE_int32(expired_hosts_check_interval_sec, 20, + "Check the expired hosts at the interval"); +DEFINE_int32(expired_threshold_sec, 10 * 60, + "Hosts will be expired in this time if no heartbeat received"); + namespace nebula { namespace meta { diff --git a/src/meta/processors/HBProcessor.h b/src/meta/processors/HBProcessor.h index dc7e922569d..a449a52f0c3 100644 --- a/src/meta/processors/HBProcessor.h +++ b/src/meta/processors/HBProcessor.h @@ -7,22 +7,52 @@ #ifndef META_HBPROCESSOR_H_ #define META_HBPROCESSOR_H_ +#include #include "meta/processors/BaseProcessor.h" +#include "meta/ActiveHostsMan.h" +#include "time/TimeUtils.h" + +DECLARE_int32(expired_hosts_check_interval_sec); +DECLARE_int32(expired_threshold_sec); namespace nebula { namespace meta { class HBProcessor : public BaseProcessor { + FRIEND_TEST(HBProcessorTest, HBTest); + public: static HBProcessor* instance(kvstore::KVStore* kvstore) { return new HBProcessor(kvstore); } - void process(const cpp2::HBReq& req); + void process(const cpp2::HBReq& req) { + HostAddr host(req.host.ip, req.host.port); + if (hostExist(MetaServiceUtils::hostKey(host.first, host.second)) + == Status::HostNotFound()) { + LOG(INFO) << "Reject unregistered host " << host << "!"; + resp_.set_code(cpp2::ErrorCode::E_INVALID_HOST); + onFinished(); + return; + } + + LOG(INFO) << "Receive heartbeat from " << host; + HostInfo info; + info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds(); + hostsMan()->updateHostInfo(host, info); + onFinished(); + } private: explicit HBProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} + : BaseProcessor(kvstore) {} + + static ActiveHostsMan* hostsMan() { + static auto hostsMan + = std::make_unique(FLAGS_expired_hosts_check_interval_sec, + FLAGS_expired_threshold_sec); + return hostsMan.get(); + } }; } // namespace meta diff --git a/src/meta/processors/RemoveHostsProcessor.cpp b/src/meta/processors/RemoveHostsProcessor.cpp index 589dbc27886..1fe5ea7eb8c 100644 --- a/src/meta/processors/RemoveHostsProcessor.cpp +++ b/src/meta/processors/RemoveHostsProcessor.cpp @@ -12,20 +12,18 @@ namespace meta { void RemoveHostsProcessor::process(const cpp2::RemoveHostsReq& req) { folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); - std::vector hostsKey; for (auto& h : req.get_hosts()) { - hostsKey.emplace_back(MetaServiceUtils::hostKey(h.ip, h.port)); - } - - auto hostsRet = hostsExist(hostsKey); - if (!hostsRet.ok()) { - resp_.set_code(to(std::move(hostsRet))); - onFinished(); - return; + auto hostKey = MetaServiceUtils::hostKey(h.ip, h.port); + auto ret = hostExist(hostKey); + if (!ret.ok()) { + LOG(WARNING) << "The host [" << h.ip << ":" << h.port << "] not existed!"; + resp_.set_code(to(ret)); + onFinished(); + return; + } + hostsKey.emplace_back(std::move(hostKey)); } - - LOG(INFO) << "Remove hosts "; resp_.set_code(cpp2::ErrorCode::SUCCEEDED); doMultiRemove(std::move(hostsKey)); } diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 20c91288845..d60ebf8d327 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -64,6 +64,32 @@ nebula_link_libraries( ) nebula_add_test(processor_test) +add_executable( + hb_processor_test + HBProcessorTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +nebula_link_libraries( + hb_processor_test + proxygenhttpserver + proxygenlib + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + gtest +) +nebula_add_test(hb_processor_test) + add_executable( meta_client_test MetaClientTest.cpp diff --git a/src/meta/test/HBProcessorTest.cpp b/src/meta/test/HBProcessorTest.cpp new file mode 100644 index 00000000000..749e420e25d --- /dev/null +++ b/src/meta/test/HBProcessorTest.cpp @@ -0,0 +1,93 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include +#include +#include +#include "fs/TempDir.h" +#include "meta/test/TestUtils.h" +#include +#include "meta/processors/HBProcessor.h" + +DECLARE_int32(expired_hosts_check_interval_sec); +DECLARE_int32(expired_threshold_sec); + +namespace nebula { +namespace meta { + +using nebula::cpp2::SupportedType; +using apache::thrift::FragileConstructor::FRAGILE; + +TEST(HBProcessorTest, HBTest) { + fs::TempDir rootPath("/tmp/HBTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + { + std::vector thriftHosts; + for (auto i = 0; i < 10; i++) { + thriftHosts.emplace_back(FRAGILE, i, i); + } + cpp2::AddHostsReq req; + req.set_hosts(std::move(thriftHosts)); + auto* processor = AddHostsProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + } + { + cpp2::ListHostsReq req; + auto* processor = ListHostsProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(10, resp.hosts.size()); + for (auto i = 0; i < 10; i++) { + ASSERT_EQ(i, resp.hosts[i].ip); + ASSERT_EQ(i, resp.hosts[i].port); + } + } + { + FLAGS_expired_hosts_check_interval_sec = 1; + FLAGS_expired_threshold_sec = 1; + for (auto i = 0; i < 5; i++) { + cpp2::HBReq req; + nebula::cpp2::HostAddr thriftHost(FRAGILE, i, i); + req.set_host(std::move(thriftHost)); + auto* processor = HBProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + } + auto hosts = HBProcessor::hostsMan()->getActiveHosts(); + ASSERT_EQ(5, hosts.size()); + sleep(3); + ASSERT_EQ(0, HBProcessor::hostsMan()->getActiveHosts().size()); + + LOG(INFO) << "Test for invalid host!"; + cpp2::HBReq req; + nebula::cpp2::HostAddr thriftHost(FRAGILE, 11, 11); + req.set_host(std::move(thriftHost)); + auto* processor = HBProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::E_INVALID_HOST, resp.code); + } +} + +} // namespace meta +} // namespace nebula + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + From 980ebd4d34ab2dacc10bc17f9b076680bae5a4b2 Mon Sep 17 00:00:00 2001 From: heng Date: Thu, 9 May 2019 12:37:09 +0800 Subject: [PATCH 3/3] Address comments --- src/meta/ActiveHostsMan.h | 5 +++++ src/meta/processors/RemoveHostsProcessor.cpp | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index 8b265e697ee..c964a5810d6 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -38,6 +38,11 @@ class ActiveHostsMan final { ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds) : intervalSeconds_(intervalSeconds) , expirationInSeconds_(expiredSeconds) { + CHECK_GT(intervalSeconds, 0) + << "intervalSeconds " << intervalSeconds << " should > 0!"; + CHECK_GE(expiredSeconds, intervalSeconds) + << "expiredSeconds " << expiredSeconds + << " should >= intervalSeconds " << intervalSeconds; CHECK(checkThread_.start()); checkThread_.addTimerTask(intervalSeconds * 1000, intervalSeconds * 1000, diff --git a/src/meta/processors/RemoveHostsProcessor.cpp b/src/meta/processors/RemoveHostsProcessor.cpp index 1fe5ea7eb8c..f83a806160f 100644 --- a/src/meta/processors/RemoveHostsProcessor.cpp +++ b/src/meta/processors/RemoveHostsProcessor.cpp @@ -17,7 +17,7 @@ void RemoveHostsProcessor::process(const cpp2::RemoveHostsReq& req) { auto hostKey = MetaServiceUtils::hostKey(h.ip, h.port); auto ret = hostExist(hostKey); if (!ret.ok()) { - LOG(WARNING) << "The host [" << h.ip << ":" << h.port << "] not existed!"; + LOG(WARNING) << "The host " << HostAddr(h.ip, h.port) << " not existed!"; resp_.set_code(to(ret)); onFinished(); return;