From 468c67044372fa820dc1f1c496b0b214bbd3b23b Mon Sep 17 00:00:00 2001 From: Doodle Date: Tue, 28 May 2019 15:00:43 +0800 Subject: [PATCH 1/3] Support displaying online hosts in "SHOW HOSTS" command [440] --- src/graph/ShowExecutor.cpp | 10 +-- src/graph/test/SchemaTest.cpp | 6 +- src/interface/meta.thrift | 13 +++- src/meta/client/MetaClient.cpp | 20 +++++- src/meta/client/MetaClient.h | 5 +- src/meta/processors/BaseProcessor.inl | 3 + .../partsMan/ListHostsProcessor.cpp | 50 ++++++++++++++- .../processors/partsMan/ListHostsProcessor.h | 5 ++ src/meta/test/HBProcessorTest.cpp | 11 ++-- src/meta/test/MetaClientTest.cpp | 12 +++- src/meta/test/ProcessorTest.cpp | 62 +++++++++++++++++-- src/meta/test/TestUtils.h | 4 +- 12 files changed, 171 insertions(+), 30 deletions(-) diff --git a/src/graph/ShowExecutor.cpp b/src/graph/ShowExecutor.cpp index 44d97635b1d..60c7e3884f7 100644 --- a/src/graph/ShowExecutor.cpp +++ b/src/graph/ShowExecutor.cpp @@ -74,13 +74,15 @@ void ShowExecutor::showHosts() { header.push_back("Ip"); header.push_back("Port"); + header.push_back("Status"); resp_->set_column_names(std::move(header)); - for (auto &host : retShowHosts) { + for (auto &status : retShowHosts) { std::vector row; - row.resize(2); - row[0].set_str(NetworkUtils::ipFromHostAddr(host)); - row[1].set_str(folly::to(NetworkUtils::portFromHostAddr(host))); + row.resize(3); + row[0].set_str(NetworkUtils::ipFromHostAddr(status.first)); + row[1].set_str(folly::to(NetworkUtils::portFromHostAddr(status.first))); + row[2].set_str(status.second); rows.emplace_back(); rows.back().set_columns(std::move(row)); } diff --git a/src/graph/test/SchemaTest.cpp b/src/graph/test/SchemaTest.cpp index 7dd1e56c3a5..9f01307bad3 100644 --- a/src/graph/test/SchemaTest.cpp +++ b/src/graph/test/SchemaTest.cpp @@ -42,9 +42,9 @@ TEST_F(SchemaTest, metaCommunication) { cpp2::ExecutionResponse resp; std::string query = "SHOW HOSTS"; client->execute(query, resp); - std::vector> expected{ - {"127.0.0.1", "1000"}, - {"127.0.0.1", "1100"}, + std::vector> expected{ + {"127.0.0.1", "1000", "online"}, + {"127.0.0.1", "1100", "online"}, }; ASSERT_TRUE(verifyResult(resp, expected)); } diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 4296af1002c..a980f44148d 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -89,6 +89,17 @@ struct EdgeItem { 4: common.Schema schema, } +enum HostStatus { + ONLINE = 0x00, + OFFLINE = 0x01, + UNKNOWN = 0x02, +} (cpp.enum_strict) + +struct HostItem { + 1: common.HostAddr hostAddr, + 2: HostStatus status, +} + struct ExecResp { 1: ErrorCode code, // For custom kv operations, it is useless. @@ -220,7 +231,7 @@ struct ListHostsResp { 1: ErrorCode code, // Valid if ret equals E_LEADER_CHANGED. 2: common.HostAddr leader, - 3: list hosts, + 3: list hosts, } struct RemoveHostsReq { diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 31c25afc61b..949775b5e79 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -264,6 +264,21 @@ std::vector MetaClient::to(const std::vector& return hosts; } +std::vector MetaClient::toHostStatus(const std::vector& tHosts) { + std::vector hosts; + hosts.resize(tHosts.size()); + std::transform(tHosts.begin(), tHosts.end(), hosts.begin(), [](const auto& h) { + switch (h.get_status()) { + case cpp2::HostStatus::ONLINE: + return HostStatus(HostAddr(h.hostAddr.get_ip(), h.hostAddr.get_port()), "online"); + case cpp2::HostStatus::OFFLINE: + return HostStatus(HostAddr(h.hostAddr.get_ip(), h.hostAddr.get_port()), "offline"); + default: + return HostStatus(HostAddr(h.hostAddr.get_ip(), h.hostAddr.get_port()), "unknown"); + } + }); + return hosts; +} std::vector MetaClient::toSpaceIdName(const std::vector& tIdNames) { std::vector idNames; @@ -455,13 +470,12 @@ folly::Future> MetaClient::addHosts(const std::vector& }, true); } - -folly::Future>> MetaClient::listHosts() { +folly::Future>> MetaClient::listHosts() { cpp2::ListHostsReq req; return getResponse(std::move(req), [] (auto client, auto request) { return client->future_listHosts(request); }, [this] (cpp2::ListHostsResp&& resp) -> decltype(auto) { - return this->to(resp.hosts); + return this->toHostStatus(resp.hosts); }); } diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index 78239f305e3..886df0a025c 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -22,6 +22,7 @@ namespace meta { using PartsAlloc = std::unordered_map>; using SpaceIdName = std::pair; +using HostStatus = std::pair; // struct for in cache using TagIDSchemas = std::unordered_map, @@ -90,7 +91,7 @@ class MetaClient { folly::Future> addHosts(const std::vector& hosts); - folly::Future>> + folly::Future>> listHosts(); folly::Future> @@ -230,6 +231,8 @@ class MetaClient { std::vector to(const std::vector& hosts); + std::vector toHostStatus(const std::vector& thosts); + std::vector toSpaceIdName(const std::vector& tIdNames); PartsMap doGetPartsMap(const HostAddr& host, diff --git a/src/meta/processors/BaseProcessor.inl b/src/meta/processors/BaseProcessor.inl index 12ae49538ec..0246b6e115e 100644 --- a/src/meta/processors/BaseProcessor.inl +++ b/src/meta/processors/BaseProcessor.inl @@ -6,6 +6,9 @@ #include "meta/MetaServiceUtils.h" #include "meta/processors/BaseProcessor.h" +#include "meta/ActiveHostsMan.h" + +DECLARE_bool(hosts_whitelist_enabled); namespace nebula { namespace meta { diff --git a/src/meta/processors/partsMan/ListHostsProcessor.cpp b/src/meta/processors/partsMan/ListHostsProcessor.cpp index 4d4d69b5bd5..129e9f25bf2 100644 --- a/src/meta/processors/partsMan/ListHostsProcessor.cpp +++ b/src/meta/processors/partsMan/ListHostsProcessor.cpp @@ -12,7 +12,7 @@ namespace meta { void ListHostsProcessor::process(const cpp2::ListHostsReq& req) { UNUSED(req); folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); - auto ret = allHosts(); + auto ret = allHostsWithStatus(); if (!ret.ok()) { LOG(ERROR) << "List Hosts Failed : No hosts"; resp_.set_code(cpp2::ErrorCode::E_NO_HOSTS); @@ -23,6 +23,54 @@ void ListHostsProcessor::process(const cpp2::ListHostsReq& req) { onFinished(); } +StatusOr> ListHostsProcessor::allHostsWithStatus() { + std::vector hostItems; + auto hosts = ActiveHostsManHolder::hostsMan()->getActiveHosts(); + std::vector activeHosts; + activeHosts.resize(hosts.size()); + std::transform(hosts.begin(), hosts.end(), activeHosts.begin(), [](const auto& h) { + nebula::cpp2::HostAddr th; + th.set_ip(h.first); + th.set_port(h.second); + return th; + }); + + if (!FLAGS_hosts_whitelist_enabled) { + std::for_each(activeHosts.begin(), activeHosts.end(), [&](auto& h) { + cpp2::HostItem item; + item.set_hostAddr(h); + item.set_status(cpp2::HostStatus::ONLINE); + hostItems.emplace_back(item); + }); + return hostItems; + } + + const auto& prefix = MetaServiceUtils::hostPrefix(); + std::unique_ptr iter; + auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + return Status::Error("Can't find any hosts"); + } + + while (iter->valid()) { + cpp2::HostItem item; + nebula::cpp2::HostAddr host; + auto hostAddrPiece = iter->key().subpiece(prefix.size()); + memcpy(&host, hostAddrPiece.data(), hostAddrPiece.size()); + item.set_hostAddr(host); + if (std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end()) { + item.set_status(cpp2::HostStatus::ONLINE); + } else { + item.set_status(cpp2::HostStatus::OFFLINE); + } + hostItems.emplace_back(item); + iter->next(); + } + + return hostItems; +} + + } // namespace meta } // namespace nebula diff --git a/src/meta/processors/partsMan/ListHostsProcessor.h b/src/meta/processors/partsMan/ListHostsProcessor.h index 87ebe2376db..0ed6bf9ecbb 100644 --- a/src/meta/processors/partsMan/ListHostsProcessor.h +++ b/src/meta/processors/partsMan/ListHostsProcessor.h @@ -23,6 +23,11 @@ class ListHostsProcessor : public BaseProcessor { private: explicit ListHostsProcessor(kvstore::KVStore* kvstore) : BaseProcessor(kvstore) {} + + /** + * Get all hosts with online/offline status. + * */ + StatusOr> allHostsWithStatus(); }; } // namespace meta diff --git a/src/meta/test/HBProcessorTest.cpp b/src/meta/test/HBProcessorTest.cpp index 75f47640f86..45030573349 100644 --- a/src/meta/test/HBProcessorTest.cpp +++ b/src/meta/test/HBProcessorTest.cpp @@ -24,8 +24,9 @@ using apache::thrift::FragileConstructor::FRAGILE; TEST(HBProcessorTest, HBTest) { fs::TempDir rootPath("/tmp/HBTest.XXXXXX"); - auto kv = TestUtils::initKV(rootPath.path()); - + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + FLAGS_expired_hosts_check_interval_sec = 1; + FLAGS_expired_threshold_sec = 1; { std::vector thriftHosts; for (auto i = 0; i < 10; i++) { @@ -47,13 +48,11 @@ TEST(HBProcessorTest, HBTest) { auto resp = std::move(f).get(); ASSERT_EQ(10, resp.hosts.size()); for (auto i = 0; i < 10; i++) { - ASSERT_EQ(i, resp.hosts[i].ip); - ASSERT_EQ(i, resp.hosts[i].port); + ASSERT_EQ(i, resp.hosts[i].hostAddr.ip); + ASSERT_EQ(i, resp.hosts[i].hostAddr.port); } } { - FLAGS_expired_hosts_check_interval_sec = 1; - FLAGS_expired_threshold_sec = 1; for (auto i = 0; i < 5; i++) { cpp2::HBReq req; nebula::cpp2::HostAddr thriftHost(FRAGILE, i, i); diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index f7ee7814dd3..71dd9e59346 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -50,7 +50,9 @@ TEST(MetaClientTest, InterfacesTest) { TestUtils::registerHB(hosts); auto ret = client->listHosts().get(); ASSERT_TRUE(ret.ok()); - ASSERT_EQ(hosts, ret.value()); + for (auto i = 0u; i < hosts.size(); i++) { + ASSERT_EQ(hosts[i], ret.value()[i].first); + } } { // Test createSpace, listSpaces, getPartsAlloc. @@ -403,7 +405,9 @@ TEST(MetaClientTest, DiffTest) { TestUtils::registerHB(hosts); auto ret = client->listHosts().get(); ASSERT_TRUE(ret.ok()); - ASSERT_EQ(hosts, ret.value()); + for (auto i = 0u; i < hosts.size(); i++) { + ASSERT_EQ(hosts[i], ret.value()[i].first); + } } { // Test Create Space and List Spaces @@ -452,7 +456,9 @@ TEST(MetaClientTest, HeartbeatTest) { ASSERT_TRUE(r.ok()); auto ret = client->listHosts().get(); ASSERT_TRUE(ret.ok()); - ASSERT_EQ(hosts, ret.value()); + for (auto i = 0u; i < hosts.size(); i++) { + ASSERT_EQ(hosts[i], ret.value()[i].first); + } } sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(1, ActiveHostsManHolder::hostsMan()->getActiveHosts().size()); diff --git a/src/meta/test/ProcessorTest.cpp b/src/meta/test/ProcessorTest.cpp index 9ec12af1a2e..a7669cc04e5 100644 --- a/src/meta/test/ProcessorTest.cpp +++ b/src/meta/test/ProcessorTest.cpp @@ -41,10 +41,12 @@ namespace meta { using nebula::cpp2::SupportedType; using apache::thrift::FragileConstructor::FRAGILE; + TEST(ProcessorTest, AddHostsTest) { fs::TempDir rootPath("/tmp/AddHostsTest.XXXXXX"); - auto kv = TestUtils::initKV(rootPath.path()); - + FLAGS_expired_hosts_check_interval_sec = 1; + FLAGS_expired_threshold_sec = 1; + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); { std::vector thriftHosts; for (auto i = 0; i < 10; i++) { @@ -66,8 +68,8 @@ TEST(ProcessorTest, AddHostsTest) { auto resp = std::move(f).get(); ASSERT_EQ(10, resp.hosts.size()); for (auto i = 0; i < 10; i++) { - ASSERT_EQ(i, resp.hosts[i].ip); - ASSERT_EQ(i, resp.hosts[i].port); + ASSERT_EQ(i, resp.hosts[i].hostAddr.ip); + ASSERT_EQ(i, resp.hosts[i].hostAddr.port); } } { @@ -91,8 +93,8 @@ TEST(ProcessorTest, AddHostsTest) { auto resp = std::move(f).get(); ASSERT_EQ(20, resp.hosts.size()); for (auto i = 0; i < 20; i++) { - ASSERT_EQ(i, resp.hosts[i].ip); - ASSERT_EQ(i, resp.hosts[i].port); + ASSERT_EQ(i, resp.hosts[i].hostAddr.ip); + ASSERT_EQ(i, resp.hosts[i].hostAddr.port); } } { @@ -118,6 +120,54 @@ TEST(ProcessorTest, AddHostsTest) { } } +TEST(ProcessorTest, ListHostsTest) { + fs::TempDir rootPath("/tmp/ListHostsTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + { + std::vector thriftHosts; + std::vector hosts; + for (auto i = 0; i < 10; i++) { + thriftHosts.emplace_back(FRAGILE, i, i); + hosts.emplace_back(i, i); + } + meta::TestUtils::registerHB(hosts); + + cpp2::AddHostsReq req; + req.set_hosts(std::move(thriftHosts)); + auto* processor = AddHostsProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + } + { + cpp2::ListHostsReq req; + auto* processor = ListHostsProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(10, resp.hosts.size()); + for (auto i = 0; i < 10; i++) { + ASSERT_EQ(i, resp.hosts[i].hostAddr.ip); + ASSERT_EQ(i, resp.hosts[i].hostAddr.port); + ASSERT_EQ(cpp2::HostStatus::ONLINE, resp.hosts[i].status); + } + } + { + sleep(FLAGS_expired_threshold_sec + 1); + cpp2::ListHostsReq req; + auto* processor = ListHostsProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(10, resp.hosts.size()); + for (auto i = 0; i < 10; i++) { + ASSERT_EQ(i, resp.hosts[i].hostAddr.ip); + ASSERT_EQ(i, resp.hosts[i].hostAddr.port); + ASSERT_EQ(cpp2::HostStatus::OFFLINE, resp.hosts[i].status); + } + } +} TEST(ProcessorTest, CreateSpaceTest) { fs::TempDir rootPath("/tmp/CreateSpaceTest.XXXXXX"); diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index 8dd3a7b5ed8..ea0a907fb72 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -102,8 +102,8 @@ class TestUtils { auto resp = std::move(f).get(); EXPECT_EQ(hosts.size(), resp.hosts.size()); for (decltype(hosts.size()) i = 0; i < hosts.size(); i++) { - EXPECT_EQ(hosts[i].first, resp.hosts[i].ip); - EXPECT_EQ(hosts[i].second, resp.hosts[i].port); + EXPECT_EQ(hosts[i].first, resp.hosts[i].hostAddr.ip); + EXPECT_EQ(hosts[i].second, resp.hosts[i].hostAddr.port); } } return hosts.size(); From ea19b8eddb9d1c733c0b36291fe08f0336a2eb58 Mon Sep 17 00:00:00 2001 From: Doodle Date: Tue, 4 Jun 2019 12:19:57 +0800 Subject: [PATCH 2/3] remove ActivHostsManHolder, store host info in rocksdb --- src/daemons/MetaDaemon.cpp | 3 + src/graph/test/SchemaTest.cpp | 4 +- src/meta/ActiveHostsMan.cpp | 124 ++++++++++++++++++ src/meta/ActiveHostsMan.h | 77 +++-------- src/meta/CMakeLists.txt | 1 + src/meta/MetaServiceUtils.cpp | 11 +- src/meta/MetaServiceUtils.h | 4 +- src/meta/processors/BaseProcessor.h | 5 +- src/meta/processors/BaseProcessor.inl | 3 - src/meta/processors/admin/HBProcessor.cpp | 3 +- src/meta/processors/admin/HBProcessor.h | 1 - .../processors/partsMan/AddHostsProcessor.cpp | 3 +- .../partsMan/CreateSpaceProcessor.cpp | 2 +- .../partsMan/ListHostsProcessor.cpp | 22 +--- src/meta/test/ActiveHostsManTest.cpp | 6 +- src/meta/test/CMakeLists.txt | 11 +- src/meta/test/HBProcessorTest.cpp | 4 +- src/meta/test/MetaClientTest.cpp | 2 +- src/meta/test/ProcessorTest.cpp | 23 +++- src/meta/test/TestUtils.h | 12 +- src/storage/test/StorageClientTest.cpp | 2 +- 21 files changed, 211 insertions(+), 112 deletions(-) create mode 100644 src/meta/ActiveHostsMan.cpp diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index 2d6e86c570e..45b4b845355 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -14,6 +14,7 @@ #include "thread/GenericThreadPool.h" #include "kvstore/PartManager.h" #include "kvstore/NebulaStore.h" +#include "meta/ActiveHostsMan.h" using nebula::ProcessUtils; using nebula::Status; @@ -129,6 +130,8 @@ int main(int argc, char *argv[]) { localhost); auto handler = std::make_shared(kvstore.get()); + auto activeHostsMan = nebula::meta::ActiveHostsMan::instance(kvstore.get()); + activeHostsMan->loadHostMap(); nebula::operator<<(operator<<(LOG(INFO), "The meta deamon start on "), localhost); try { diff --git a/src/graph/test/SchemaTest.cpp b/src/graph/test/SchemaTest.cpp index 9f01307bad3..4a19a8263db 100644 --- a/src/graph/test/SchemaTest.cpp +++ b/src/graph/test/SchemaTest.cpp @@ -43,8 +43,8 @@ TEST_F(SchemaTest, metaCommunication) { std::string query = "SHOW HOSTS"; client->execute(query, resp); std::vector> expected{ - {"127.0.0.1", "1000", "online"}, - {"127.0.0.1", "1100", "online"}, + {"127.0.0.1", "1000", "offline"}, + {"127.0.0.1", "1100", "offline"}, }; ASSERT_TRUE(verifyResult(resp, expected)); } diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp new file mode 100644 index 00000000000..7c7086c849f --- /dev/null +++ b/src/meta/ActiveHostsMan.cpp @@ -0,0 +1,124 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/ActiveHostsMan.h" +#include "meta/MetaServiceUtils.h" +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { + +ActiveHostsMan::ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds, + kvstore::KVStore* kv) + : intervalSeconds_(intervalSeconds) + , expirationInSeconds_(expiredSeconds) { + if (kv != nullptr) { + kvstore_ = kv; + } + + CHECK_GT(intervalSeconds, 0) + << "intervalSeconds " << intervalSeconds << " should > 0!"; + CHECK_GE(expiredSeconds, intervalSeconds) + << "expiredSeconds " << expiredSeconds + << " should >= intervalSeconds " << intervalSeconds; + CHECK(checkThread_.start()); + checkThread_.addTimerTask(intervalSeconds * 1000, + intervalSeconds * 1000, + &ActiveHostsMan::cleanExpiredHosts, + this); +} + +void ActiveHostsMan::updateHostInfo(const HostAddr& hostAddr, const HostInfo& info) { + std::vector data; + { + folly::RWSpinLock::ReadHolder rh(&lock_); + auto it = hostsMap_.find(hostAddr); + if (it == hostsMap_.end()) { + folly::RWSpinLock::UpgradedHolder uh(&lock_); + hostsMap_.emplace(hostAddr, std::move(info)); + data.emplace_back(MetaServiceUtils::hostKey(hostAddr.first, hostAddr.second), + MetaServiceUtils::hostValOnline()); + } else { + it->second.lastHBTimeInSec_ = info.lastHBTimeInSec_; + } + } + if (kvstore_ != nullptr && !data.empty()) { + folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); + kvstore_->asyncMultiPut(kDefaultSpaceId_, kDefaultPartId_, std::move(data), + [] (kvstore::ResultCode code, HostAddr leader) { + UNUSED(leader); + CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); + }); + } +} + +std::vector ActiveHostsMan::getActiveHosts() { + std::vector hosts; + folly::RWSpinLock::ReadHolder rh(&lock_); + hosts.resize(hostsMap_.size()); + std::transform(hostsMap_.begin(), hostsMap_.end(), hosts.begin(), + [](const auto& entry) -> decltype(auto) { + return entry.first; + }); + return hosts; +} + +void ActiveHostsMan::loadHostMap() { + if (kvstore_ == nullptr) { + return; + } + + const auto& prefix = MetaServiceUtils::hostPrefix(); + std::unique_ptr iter; + auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + return; + } + + while (iter->valid()) { + auto host = MetaServiceUtils::parseHostKey(iter->key()); + HostInfo info; + info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds(); + if (iter->val() == MetaServiceUtils::hostValOnline()) { + LOG(INFO) << "load host " << host.ip << ":" << host.port; + updateHostInfo({host.ip, host.port}, info); + } + iter->next(); + } +} + +void ActiveHostsMan::cleanExpiredHosts() { + int64_t now = time::TimeUtils::nowInSeconds(); + std::vector data; + { + folly::RWSpinLock::WriteHolder rh(&lock_); + auto it = hostsMap_.begin(); + while (it != hostsMap_.end()) { + if ((now - it->second.lastHBTimeInSec_) > expirationInSeconds_) { + LOG(INFO) << it->first << " expired! last hb time " + << it->second.lastHBTimeInSec_; + data.emplace_back(MetaServiceUtils::hostKey(it->first.first, it->first.second), + MetaServiceUtils::hostValOffline()); + it = hostsMap_.erase(it); + } else { + it++; + } + } + } + + if (!data.empty() && kvstore_ != nullptr) { + folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); + LOG(INFO) << "set " << data.size() << " expired hosts to offline in meta rocksdb"; + kvstore_->asyncMultiPut(kDefaultSpaceId_, kDefaultPartId_, std::move(data), + [] (kvstore::ResultCode code, HostAddr leader) { + CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); + UNUSED(leader); + }); + } +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index 030c6173c77..fcfbda826b2 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -11,6 +11,7 @@ #include #include "thread/GenericWorker.h" #include "time/TimeUtils.h" +#include "kvstore/KVStore.h" DECLARE_int32(expired_hosts_check_interval_sec); DECLARE_int32(expired_threshold_sec); @@ -34,38 +35,15 @@ struct HostInfo { int64_t lastHBTimeInSec_ = 0; }; -class ActiveHostsMan; - -class ActiveHostsManHolder final { -public: - ActiveHostsManHolder() = delete; - ~ActiveHostsManHolder() = delete; - - static ActiveHostsMan* hostsMan() { - static auto hostsMan - = std::make_unique(FLAGS_expired_hosts_check_interval_sec, - FLAGS_expired_threshold_sec); - return hostsMan.get(); - } -}; - class ActiveHostsMan final { FRIEND_TEST(ActiveHostsManTest, NormalTest); public: - ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds) - : intervalSeconds_(intervalSeconds) - , expirationInSeconds_(expiredSeconds) { - CHECK_GT(intervalSeconds, 0) - << "intervalSeconds " << intervalSeconds << " should > 0!"; - CHECK_GE(expiredSeconds, intervalSeconds) - << "expiredSeconds " << expiredSeconds - << " should >= intervalSeconds " << intervalSeconds; - CHECK(checkThread_.start()); - checkThread_.addTimerTask(intervalSeconds * 1000, - intervalSeconds * 1000, - &ActiveHostsMan::cleanExpiredHosts, - this); + static ActiveHostsMan* instance(kvstore::KVStore* kv = nullptr) { + static auto activeHostsMan = std::unique_ptr( + new ActiveHostsMan(FLAGS_expired_hosts_check_interval_sec, + FLAGS_expired_threshold_sec, kv)); + return activeHostsMan.get(); } ~ActiveHostsMan() { @@ -73,27 +51,11 @@ class ActiveHostsMan final { checkThread_.wait(); } - void updateHostInfo(const HostAddr& hostAddr, const HostInfo& info) { - folly::RWSpinLock::ReadHolder rh(&lock_); - auto it = hostsMap_.find(hostAddr); - if (it == hostsMap_.end()) { - folly::RWSpinLock::UpgradedHolder uh(&lock_); - hostsMap_.emplace(std::move(hostAddr), std::move(info)); - } else { - it->second.lastHBTimeInSec_ = info.lastHBTimeInSec_; - } - } + void loadHostMap(); - std::vector getActiveHosts() { - std::vector hosts; - folly::RWSpinLock::ReadHolder rh(&lock_); - hosts.resize(hostsMap_.size()); - std::transform(hostsMap_.begin(), hostsMap_.end(), hosts.begin(), - [](const auto& entry) -> decltype(auto) { - return entry.first; - }); - return hosts; - } + void updateHostInfo(const HostAddr& hostAddr, const HostInfo& info); + + std::vector getActiveHosts(); void reset() { folly::RWSpinLock::WriteHolder rh(&lock_); @@ -101,28 +63,19 @@ class ActiveHostsMan final { } protected: - void cleanExpiredHosts() { - int64_t now = time::TimeUtils::nowInSeconds(); - folly::RWSpinLock::WriteHolder rh(&lock_); - auto it = hostsMap_.begin(); - while (it != hostsMap_.end()) { - if ((now - it->second.lastHBTimeInSec_) > expirationInSeconds_) { - LOG(INFO) << it->first << " expired! last hb time " - << it->second.lastHBTimeInSec_; - it = hostsMap_.erase(it); - } else { - it++; - } - } - } + void cleanExpiredHosts(); private: + ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds, kvstore::KVStore* kv = nullptr); + folly::RWSpinLock lock_; std::unordered_map hostsMap_; thread::GenericWorker checkThread_; int32_t intervalSeconds_ = 0; int32_t expirationInSeconds_ = 5 * 60; + kvstore::KVStore* kvstore_ = nullptr; }; + } // namespace meta } // namespace nebula diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index ecd5a67541f..6f0e1fcf61f 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -11,6 +11,7 @@ add_library( meta_service_handler OBJECT MetaServiceHandler.cpp MetaServiceUtils.cpp + ActiveHostsMan.cpp processors/partsMan/AddHostsProcessor.cpp processors/partsMan/ListHostsProcessor.cpp processors/partsMan/RemoveHostsProcessor.cpp diff --git a/src/meta/MetaServiceUtils.cpp b/src/meta/MetaServiceUtils.cpp index 2f4cee941f0..e77adad898b 100644 --- a/src/meta/MetaServiceUtils.cpp +++ b/src/meta/MetaServiceUtils.cpp @@ -18,6 +18,9 @@ const std::string kTagsTable = "__tags__"; // NOLINT const std::string kEdgesTable = "__edges__"; // NOLINT const std::string kIndexTable = "__index__"; // NOLINT +const std::string kHostOnline = "Online"; // NOLINT +const std::string kHostOffline = "Offline"; // NOLINT + std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) { std::string key; key.reserve(128); @@ -103,8 +106,12 @@ std::string MetaServiceUtils::hostKey(IPv4 ip, Port port) { return key; } -std::string MetaServiceUtils::hostVal() { - return ""; +std::string MetaServiceUtils::hostValOnline() { + return kHostOnline; +} + +std::string MetaServiceUtils::hostValOffline() { + return kHostOffline; } const std::string& MetaServiceUtils::hostPrefix() { diff --git a/src/meta/MetaServiceUtils.h b/src/meta/MetaServiceUtils.h index cfff8e7b88c..497f776927a 100644 --- a/src/meta/MetaServiceUtils.h +++ b/src/meta/MetaServiceUtils.h @@ -47,7 +47,9 @@ class MetaServiceUtils final { static std::string hostKey(IPv4 ip, Port port); - static std::string hostVal(); + static std::string hostValOnline(); + + static std::string hostValOffline(); static const std::string& hostPrefix(); diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index 032b81bc461..b969a3098b1 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -23,6 +23,9 @@ namespace meta { using nebula::network::NetworkUtils; +const PartitionID kDefaultPartId_ = 0; +const GraphSpaceID kDefaultSpaceId_ = 0; + class LockUtils { public: LockUtils() = delete; @@ -204,8 +207,6 @@ class BaseProcessor { kvstore::KVStore* kvstore_ = nullptr; RESP resp_; folly::Promise promise_; - const PartitionID kDefaultPartId_ = 0; - const GraphSpaceID kDefaultSpaceId_ = 0; }; } // namespace meta diff --git a/src/meta/processors/BaseProcessor.inl b/src/meta/processors/BaseProcessor.inl index 0246b6e115e..12ae49538ec 100644 --- a/src/meta/processors/BaseProcessor.inl +++ b/src/meta/processors/BaseProcessor.inl @@ -6,9 +6,6 @@ #include "meta/MetaServiceUtils.h" #include "meta/processors/BaseProcessor.h" -#include "meta/ActiveHostsMan.h" - -DECLARE_bool(hosts_whitelist_enabled); namespace nebula { namespace meta { diff --git a/src/meta/processors/admin/HBProcessor.cpp b/src/meta/processors/admin/HBProcessor.cpp index 4844e276549..54b7b61e565 100644 --- a/src/meta/processors/admin/HBProcessor.cpp +++ b/src/meta/processors/admin/HBProcessor.cpp @@ -6,6 +6,7 @@ #include "meta/processors/admin/HBProcessor.h" +#include "meta/ActiveHostsMan.h" DEFINE_int32(expired_hosts_check_interval_sec, 20, "Check the expired hosts at the interval"); @@ -30,7 +31,7 @@ void HBProcessor::process(const cpp2::HBReq& req) { LOG(INFO) << "Receive heartbeat from " << host; HostInfo info; info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds(); - ActiveHostsManHolder::hostsMan()->updateHostInfo(host, info); + ActiveHostsMan::instance()->updateHostInfo(host, info); onFinished(); } diff --git a/src/meta/processors/admin/HBProcessor.h b/src/meta/processors/admin/HBProcessor.h index a1206bada38..7ec7ef1d574 100644 --- a/src/meta/processors/admin/HBProcessor.h +++ b/src/meta/processors/admin/HBProcessor.h @@ -9,7 +9,6 @@ #include #include "meta/processors/BaseProcessor.h" -#include "meta/ActiveHostsMan.h" #include "time/TimeUtils.h" diff --git a/src/meta/processors/partsMan/AddHostsProcessor.cpp b/src/meta/processors/partsMan/AddHostsProcessor.cpp index 55d378188a5..2f708eb836d 100644 --- a/src/meta/processors/partsMan/AddHostsProcessor.cpp +++ b/src/meta/processors/partsMan/AddHostsProcessor.cpp @@ -13,7 +13,8 @@ void AddHostsProcessor::process(const cpp2::AddHostsReq& req) { folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); std::vector data; for (auto& h : req.get_hosts()) { - data.emplace_back(MetaServiceUtils::hostKey(h.ip, h.port), MetaServiceUtils::hostVal()); + data.emplace_back(MetaServiceUtils::hostKey(h.ip, h.port), + MetaServiceUtils::hostValOffline()); } doPut(std::move(data)); } diff --git a/src/meta/processors/partsMan/CreateSpaceProcessor.cpp b/src/meta/processors/partsMan/CreateSpaceProcessor.cpp index 639f224581b..a3c8de727f6 100644 --- a/src/meta/processors/partsMan/CreateSpaceProcessor.cpp +++ b/src/meta/processors/partsMan/CreateSpaceProcessor.cpp @@ -23,7 +23,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { return; } CHECK_EQ(Status::SpaceNotFound(), spaceRet.status()); - auto hosts = ActiveHostsManHolder::hostsMan()->getActiveHosts(); + auto hosts = ActiveHostsMan::instance()->getActiveHosts(); if (hosts.empty()) { LOG(ERROR) << "Create Space Failed : No Hosts!"; resp_.set_code(cpp2::ErrorCode::E_NO_HOSTS); diff --git a/src/meta/processors/partsMan/ListHostsProcessor.cpp b/src/meta/processors/partsMan/ListHostsProcessor.cpp index 129e9f25bf2..50d1b4807a2 100644 --- a/src/meta/processors/partsMan/ListHostsProcessor.cpp +++ b/src/meta/processors/partsMan/ListHostsProcessor.cpp @@ -25,26 +25,6 @@ void ListHostsProcessor::process(const cpp2::ListHostsReq& req) { StatusOr> ListHostsProcessor::allHostsWithStatus() { std::vector hostItems; - auto hosts = ActiveHostsManHolder::hostsMan()->getActiveHosts(); - std::vector activeHosts; - activeHosts.resize(hosts.size()); - std::transform(hosts.begin(), hosts.end(), activeHosts.begin(), [](const auto& h) { - nebula::cpp2::HostAddr th; - th.set_ip(h.first); - th.set_port(h.second); - return th; - }); - - if (!FLAGS_hosts_whitelist_enabled) { - std::for_each(activeHosts.begin(), activeHosts.end(), [&](auto& h) { - cpp2::HostItem item; - item.set_hostAddr(h); - item.set_status(cpp2::HostStatus::ONLINE); - hostItems.emplace_back(item); - }); - return hostItems; - } - const auto& prefix = MetaServiceUtils::hostPrefix(); std::unique_ptr iter; auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); @@ -58,7 +38,7 @@ StatusOr> ListHostsProcessor::allHostsWithStatus() { auto hostAddrPiece = iter->key().subpiece(prefix.size()); memcpy(&host, hostAddrPiece.data(), hostAddrPiece.size()); item.set_hostAddr(host); - if (std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end()) { + if (iter->val() == MetaServiceUtils::hostValOnline()) { item.set_status(cpp2::HostStatus::ONLINE); } else { item.set_status(cpp2::HostStatus::OFFLINE); diff --git a/src/meta/test/ActiveHostsManTest.cpp b/src/meta/test/ActiveHostsManTest.cpp index 61ec13ef21d..4970569b76d 100644 --- a/src/meta/test/ActiveHostsManTest.cpp +++ b/src/meta/test/ActiveHostsManTest.cpp @@ -6,18 +6,22 @@ #include "base/Base.h" #include #include "meta/ActiveHostsMan.h" +#include "fs/TempDir.h" +#include "meta/test/TestUtils.h" namespace nebula { namespace meta { TEST(ActiveHostsManTest, NormalTest) { + fs::TempDir rootPath("/tmp/ActiveHostsManTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); auto now = time::TimeUtils::nowInSeconds(); ActiveHostsMan ahMan(1, 1); ahMan.updateHostInfo(HostAddr(0, 0), HostInfo(now)); ahMan.updateHostInfo(HostAddr(0, 1), HostInfo(now)); ahMan.updateHostInfo(HostAddr(0, 2), HostInfo(now)); - ASSERT_EQ(3, ahMan.getActiveHosts().size()); + ahMan.updateHostInfo(HostAddr(0, 0), HostInfo(now + 2)); ASSERT_EQ(3, ahMan.getActiveHosts().size()); { diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 7359c31e9a8..14d31effc34 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -116,12 +116,21 @@ add_executable( $ $ $ + $ + $ + $ + $ + $ + $ + $ ) nebula_link_libraries( active_hosts_man_test + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle gtest ) - nebula_add_test(active_hosts_man_test) diff --git a/src/meta/test/HBProcessorTest.cpp b/src/meta/test/HBProcessorTest.cpp index 45030573349..b7c7107576a 100644 --- a/src/meta/test/HBProcessorTest.cpp +++ b/src/meta/test/HBProcessorTest.cpp @@ -63,10 +63,10 @@ TEST(HBProcessorTest, HBTest) { auto resp = std::move(f).get(); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); } - auto hosts = ActiveHostsManHolder::hostsMan()->getActiveHosts(); + auto hosts = ActiveHostsMan::instance()->getActiveHosts(); ASSERT_EQ(5, hosts.size()); sleep(3); - ASSERT_EQ(0, ActiveHostsManHolder::hostsMan()->getActiveHosts().size()); + ASSERT_EQ(0, ActiveHostsMan::instance()->getActiveHosts().size()); LOG(INFO) << "Test for invalid host!"; cpp2::HBReq req; diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 71dd9e59346..627a3e82a16 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -461,7 +461,7 @@ TEST(MetaClientTest, HeartbeatTest) { } } sleep(FLAGS_heartbeat_interval_secs + 1); - ASSERT_EQ(1, ActiveHostsManHolder::hostsMan()->getActiveHosts().size()); + ASSERT_EQ(1, ActiveHostsMan::instance()->getActiveHosts().size()); } } // namespace meta diff --git a/src/meta/test/ProcessorTest.cpp b/src/meta/test/ProcessorTest.cpp index a7669cc04e5..0764d917501 100644 --- a/src/meta/test/ProcessorTest.cpp +++ b/src/meta/test/ProcessorTest.cpp @@ -123,14 +123,14 @@ TEST(ProcessorTest, AddHostsTest) { TEST(ProcessorTest, ListHostsTest) { fs::TempDir rootPath("/tmp/ListHostsTest.XXXXXX"); std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + ActiveHostsMan::instance(kv.get()); + std::vector hosts; { std::vector thriftHosts; - std::vector hosts; for (auto i = 0; i < 10; i++) { thriftHosts.emplace_back(FRAGILE, i, i); hosts.emplace_back(i, i); } - meta::TestUtils::registerHB(hosts); cpp2::AddHostsReq req; req.set_hosts(std::move(thriftHosts)); @@ -141,6 +141,22 @@ TEST(ProcessorTest, ListHostsTest) { ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); } { + // add hosts will set host status to offline + cpp2::ListHostsReq req; + auto* processor = ListHostsProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(10, resp.hosts.size()); + for (auto i = 0; i < 10; i++) { + ASSERT_EQ(i, resp.hosts[i].hostAddr.ip); + ASSERT_EQ(i, resp.hosts[i].hostAddr.port); + ASSERT_EQ(cpp2::HostStatus::OFFLINE, resp.hosts[i].status); + } + } + { + // after received heartbeat, host status will become online + meta::TestUtils::registerHB(hosts); cpp2::ListHostsReq req; auto* processor = ListHostsProcessor::instance(kv.get()); auto f = processor->getFuture(); @@ -154,7 +170,8 @@ TEST(ProcessorTest, ListHostsTest) { } } { - sleep(FLAGS_expired_threshold_sec + 1); + // host info expired + sleep(3); cpp2::ListHostsReq req; auto* processor = ListHostsProcessor::instance(kv.get()); auto f = processor->getFuture(); diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index ea0a907fb72..945f3769e87 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -66,12 +66,12 @@ class TestUtils { } static void registerHB(const std::vector& hosts) { - ActiveHostsManHolder::hostsMan()->reset(); - auto now = time::TimeUtils::nowInSeconds(); - for (auto& h : hosts) { - ActiveHostsManHolder::hostsMan()->updateHostInfo(h, HostInfo(now)); - } - } + ActiveHostsMan::instance()->reset(); + auto now = time::TimeUtils::nowInSeconds(); + for (auto& h : hosts) { + ActiveHostsMan::instance()->updateHostInfo(h, HostInfo(now)); + } + } static int32_t createSomeHosts(kvstore::KVStore* kv, std::vector hosts diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 3daad55f5dd..aa07397de59 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -65,7 +65,7 @@ TEST(StorageClientTest, VerticesInterfacesTest) { LOG(INFO) << "Add hosts and create space...."; auto r = mClient->addHosts({HostAddr(localIp, localDataPort)}).get(); ASSERT_TRUE(r.ok()); - while (meta::ActiveHostsManHolder::hostsMan()->getActiveHosts().size() == 0) { + while (meta::ActiveHostsMan::instance()->getActiveHosts().size() == 0) { usleep(1000); } VLOG(1) << "The storage server has been added to the meta service"; From 703a17f386a9d9293ae97c8097ecd8031e75f9d5 Mon Sep 17 00:00:00 2001 From: Doodle Date: Thu, 6 Jun 2019 14:36:00 +0800 Subject: [PATCH 3/3] Use call_once to init ActiveHostsMan. Rebased on master. --- src/daemons/MetaDaemon.cpp | 3 +- src/meta/ActiveHostsMan.cpp | 12 +++--- src/meta/ActiveHostsMan.h | 14 +++++-- src/meta/processors/BaseProcessor.h | 4 +- src/meta/processors/BaseProcessor.inl | 40 +++++++++---------- .../partsMan/DropSpaceProcessor.cpp | 2 +- .../partsMan/GetPartsAllocProcessor.cpp | 2 +- .../partsMan/ListHostsProcessor.cpp | 2 +- .../partsMan/ListSpacesProcessor.cpp | 2 +- .../schemaMan/AlterEdgeProcessor.cpp | 2 +- .../schemaMan/AlterTagProcessor.cpp | 2 +- .../schemaMan/DropEdgeProcessor.cpp | 4 +- .../processors/schemaMan/DropTagProcessor.cpp | 4 +- .../schemaMan/ListEdgesProcessor.cpp | 2 +- .../schemaMan/ListTagsProcessor.cpp | 2 +- src/meta/test/CMakeLists.txt | 4 ++ src/meta/test/ProcessorTest.cpp | 7 ++-- 17 files changed, 59 insertions(+), 49 deletions(-) diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index 45b4b845355..4a248dc0534 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -130,8 +130,7 @@ int main(int argc, char *argv[]) { localhost); auto handler = std::make_shared(kvstore.get()); - auto activeHostsMan = nebula::meta::ActiveHostsMan::instance(kvstore.get()); - activeHostsMan->loadHostMap(); + nebula::meta::ActiveHostsMan::instance(kvstore.get()); nebula::operator<<(operator<<(LOG(INFO), "The meta deamon start on "), localhost); try { diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 7c7086c849f..3e689b64e99 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -47,9 +47,8 @@ void ActiveHostsMan::updateHostInfo(const HostAddr& hostAddr, const HostInfo& in } if (kvstore_ != nullptr && !data.empty()) { folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); - kvstore_->asyncMultiPut(kDefaultSpaceId_, kDefaultPartId_, std::move(data), - [] (kvstore::ResultCode code, HostAddr leader) { - UNUSED(leader); + kvstore_->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), + [] (kvstore::ResultCode code) { CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); }); } @@ -73,7 +72,7 @@ void ActiveHostsMan::loadHostMap() { const auto& prefix = MetaServiceUtils::hostPrefix(); std::unique_ptr iter; - auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); + auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { return; } @@ -112,10 +111,9 @@ void ActiveHostsMan::cleanExpiredHosts() { if (!data.empty() && kvstore_ != nullptr) { folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); LOG(INFO) << "set " << data.size() << " expired hosts to offline in meta rocksdb"; - kvstore_->asyncMultiPut(kDefaultSpaceId_, kDefaultPartId_, std::move(data), - [] (kvstore::ResultCode code, HostAddr leader) { + kvstore_->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), + [] (kvstore::ResultCode code) { CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); - UNUSED(leader); }); } } diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index fcfbda826b2..81b85c1bcad 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -11,7 +11,7 @@ #include #include "thread/GenericWorker.h" #include "time/TimeUtils.h" -#include "kvstore/KVStore.h" +#include "kvstore/NebulaStore.h" DECLARE_int32(expired_hosts_check_interval_sec); DECLARE_int32(expired_threshold_sec); @@ -37,12 +37,15 @@ struct HostInfo { class ActiveHostsMan final { FRIEND_TEST(ActiveHostsManTest, NormalTest); + FRIEND_TEST(ProcessorTest, ListHostsTest); public: static ActiveHostsMan* instance(kvstore::KVStore* kv = nullptr) { static auto activeHostsMan = std::unique_ptr( new ActiveHostsMan(FLAGS_expired_hosts_check_interval_sec, FLAGS_expired_threshold_sec, kv)); + static std::once_flag initFlag; + std::call_once(initFlag, &ActiveHostsMan::loadHostMap, activeHostsMan.get()); return activeHostsMan.get(); } @@ -51,8 +54,6 @@ class ActiveHostsMan final { checkThread_.wait(); } - void loadHostMap(); - void updateHostInfo(const HostAddr& hostAddr, const HostInfo& info); std::vector getActiveHosts(); @@ -68,6 +69,13 @@ class ActiveHostsMan final { private: ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds, kvstore::KVStore* kv = nullptr); + void loadHostMap(); + + void stopClean() { + checkThread_.stop(); + checkThread_.wait(); + } + folly::RWSpinLock lock_; std::unordered_map hostsMap_; thread::GenericWorker checkThread_; diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index b969a3098b1..3dcccc83f9b 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -23,8 +23,8 @@ namespace meta { using nebula::network::NetworkUtils; -const PartitionID kDefaultPartId_ = 0; -const GraphSpaceID kDefaultSpaceId_ = 0; +const PartitionID kDefaultPartId = 0; +const GraphSpaceID kDefaultSpaceId = 0; class LockUtils { public: diff --git a/src/meta/processors/BaseProcessor.inl b/src/meta/processors/BaseProcessor.inl index 12ae49538ec..5f671200792 100644 --- a/src/meta/processors/BaseProcessor.inl +++ b/src/meta/processors/BaseProcessor.inl @@ -12,8 +12,8 @@ namespace meta { template void BaseProcessor::doPut(std::vector data) { - kvstore_->asyncMultiPut(kDefaultSpaceId_, - kDefaultPartId_, + kvstore_->asyncMultiPut(kDefaultSpaceId, + kDefaultPartId, std::move(data), [this] (kvstore::ResultCode code) { this->resp_.set_code(to(code)); @@ -26,7 +26,7 @@ template StatusOr> BaseProcessor::doPrefix(const std::string& key) { std::unique_ptr iter; - auto code = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, key, &iter); + auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, key, &iter); if (code != kvstore::ResultCode::SUCCEEDED) { return Status::Error("Prefix Failed"); } @@ -37,7 +37,7 @@ BaseProcessor::doPrefix(const std::string& key) { template StatusOr BaseProcessor::doGet(const std::string& key) { std::string value; - auto code = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, key, &value); + auto code = kvstore_->get(kDefaultSpaceId, kDefaultPartId, key, &value); switch (code) { case kvstore::ResultCode::SUCCEEDED: return value; @@ -53,7 +53,7 @@ template StatusOr> BaseProcessor::doMultiGet(const std::vector& keys) { std::vector values; - auto code = kvstore_->multiGet(kDefaultSpaceId_, kDefaultPartId_, keys, &values); + auto code = kvstore_->multiGet(kDefaultSpaceId, kDefaultPartId, keys, &values); if (code != kvstore::ResultCode::SUCCEEDED) { return Status::Error("MultiGet Failed"); } @@ -63,8 +63,8 @@ BaseProcessor::doMultiGet(const std::vector& keys) { template void BaseProcessor::doRemove(const std::string& key) { - kvstore_->asyncRemove(kDefaultSpaceId_, - kDefaultPartId_, + kvstore_->asyncRemove(kDefaultSpaceId, + kDefaultPartId, key, [this] (kvstore::ResultCode code) { this->resp_.set_code(to(code)); @@ -75,8 +75,8 @@ void BaseProcessor::doRemove(const std::string& key) { template void BaseProcessor::doMultiRemove(std::vector keys) { - kvstore_->asyncMultiRemove(kDefaultSpaceId_, - kDefaultPartId_, + kvstore_->asyncMultiRemove(kDefaultSpaceId, + kDefaultPartId, std::move(keys), [this] (kvstore::ResultCode code) { this->resp_.set_code(to(code)); @@ -88,8 +88,8 @@ void BaseProcessor::doMultiRemove(std::vector keys) { template void BaseProcessor::doRemoveRange(const std::string& start, const std::string& end) { - kvstore_->asyncRemoveRange(kDefaultSpaceId_, - kDefaultPartId_, + kvstore_->asyncRemoveRange(kDefaultSpaceId, + kDefaultPartId, start, end, [this] (kvstore::ResultCode code) { @@ -103,7 +103,7 @@ template StatusOr> BaseProcessor::doScan(const std::string& start, const std::string& end) { std::unique_ptr iter; - auto code = kvstore_->range(kDefaultSpaceId_, kDefaultPartId_, start, end, &iter); + auto code = kvstore_->range(kDefaultSpaceId, kDefaultPartId, start, end, &iter); if (code != kvstore::ResultCode::SUCCEEDED) { return Status::Error("Scan Failed"); } @@ -122,7 +122,7 @@ StatusOr> BaseProcessor::allHosts() { std::vector hosts; const auto& prefix = MetaServiceUtils::hostPrefix(); std::unique_ptr iter; - auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); + auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { return Status::Error("Can't find any hosts"); } @@ -144,7 +144,7 @@ int32_t BaseProcessor::autoIncrementId() { static const std::string kIdKey = "__id__"; int32_t id; std::string val; - auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, kIdKey, &val); + auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, kIdKey, &val); if (ret != kvstore::ResultCode::SUCCEEDED) { CHECK_EQ(kvstore::ResultCode::ERR_KEY_NOT_FOUND, ret); id = 1; @@ -155,8 +155,8 @@ int32_t BaseProcessor::autoIncrementId() { std::vector data; data.emplace_back(kIdKey, std::string(reinterpret_cast(&id), sizeof(id))); - kvstore_->asyncMultiPut(kDefaultSpaceId_, - kDefaultPartId_, + kvstore_->asyncMultiPut(kDefaultSpaceId, + kDefaultPartId, std::move(data), [this] (kvstore::ResultCode code) { CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); @@ -170,7 +170,7 @@ Status BaseProcessor::spaceExist(GraphSpaceID spaceId) { folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); auto spaceKey = MetaServiceUtils::spaceKey(spaceId); std::string val; - auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, spaceKey, &val); + auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &val); if (ret == kvstore::ResultCode::SUCCEEDED) { return Status::OK(); } @@ -181,7 +181,7 @@ Status BaseProcessor::spaceExist(GraphSpaceID spaceId) { template Status BaseProcessor::hostExist(const std::string& hostKey) { std::string val; - auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, hostKey , &val); + auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, hostKey , &val); if (ret == kvstore::ResultCode::SUCCEEDED) { return Status::OK(); } @@ -193,7 +193,7 @@ template StatusOr BaseProcessor::getSpaceId(const std::string& name) { auto indexKey = MetaServiceUtils::indexSpaceKey(name); std::string val; - auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, indexKey, &val); + auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, indexKey, &val); if (ret == kvstore::ResultCode::SUCCEEDED) { return *reinterpret_cast(val.c_str()); } @@ -205,7 +205,7 @@ template StatusOr BaseProcessor::getTagId(GraphSpaceID spaceId, const std::string& name) { auto indexKey = MetaServiceUtils::indexTagKey(spaceId, name); std::string val; - auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, indexKey, &val); + auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, indexKey, &val); if (ret == kvstore::ResultCode::SUCCEEDED) { return *reinterpret_cast(val.c_str()); } diff --git a/src/meta/processors/partsMan/DropSpaceProcessor.cpp b/src/meta/processors/partsMan/DropSpaceProcessor.cpp index 36c7f5483a4..0ee7a5c4926 100644 --- a/src/meta/processors/partsMan/DropSpaceProcessor.cpp +++ b/src/meta/processors/partsMan/DropSpaceProcessor.cpp @@ -26,7 +26,7 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) { auto prefix = MetaServiceUtils::partPrefix(spaceId); std::unique_ptr iter; - auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); + auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { resp_.set_code(to(ret)); onFinished(); diff --git a/src/meta/processors/partsMan/GetPartsAllocProcessor.cpp b/src/meta/processors/partsMan/GetPartsAllocProcessor.cpp index 34b9abf3bf8..722836d2dae 100644 --- a/src/meta/processors/partsMan/GetPartsAllocProcessor.cpp +++ b/src/meta/processors/partsMan/GetPartsAllocProcessor.cpp @@ -14,7 +14,7 @@ void GetPartsAllocProcessor::process(const cpp2::GetPartsAllocReq& req) { auto spaceId = req.get_space_id(); auto prefix = MetaServiceUtils::partPrefix(spaceId); std::unique_ptr iter; - auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); + auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); resp_.set_code(to(ret)); if (ret != kvstore::ResultCode::SUCCEEDED) { onFinished(); diff --git a/src/meta/processors/partsMan/ListHostsProcessor.cpp b/src/meta/processors/partsMan/ListHostsProcessor.cpp index 50d1b4807a2..7adf3eb06d7 100644 --- a/src/meta/processors/partsMan/ListHostsProcessor.cpp +++ b/src/meta/processors/partsMan/ListHostsProcessor.cpp @@ -27,7 +27,7 @@ StatusOr> ListHostsProcessor::allHostsWithStatus() { std::vector hostItems; const auto& prefix = MetaServiceUtils::hostPrefix(); std::unique_ptr iter; - auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); + auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { return Status::Error("Can't find any hosts"); } diff --git a/src/meta/processors/partsMan/ListSpacesProcessor.cpp b/src/meta/processors/partsMan/ListSpacesProcessor.cpp index 6fa464db848..9df414043b8 100644 --- a/src/meta/processors/partsMan/ListSpacesProcessor.cpp +++ b/src/meta/processors/partsMan/ListSpacesProcessor.cpp @@ -14,7 +14,7 @@ void ListSpacesProcessor::process(const cpp2::ListSpacesReq& req) { folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); auto prefix = MetaServiceUtils::spacePrefix(); std::unique_ptr iter; - auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); + auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { resp_.set_code(to(ret)); onFinished(); diff --git a/src/meta/processors/schemaMan/AlterEdgeProcessor.cpp b/src/meta/processors/schemaMan/AlterEdgeProcessor.cpp index 44dc1a40948..ef7a5e1af92 100644 --- a/src/meta/processors/schemaMan/AlterEdgeProcessor.cpp +++ b/src/meta/processors/schemaMan/AlterEdgeProcessor.cpp @@ -24,7 +24,7 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) { // Check the edge belongs to the space std::unique_ptr iter; auto edgePrefix = MetaServiceUtils::schemaEdgePrefix(req.get_space_id(), edgeType); - auto code = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, edgePrefix, &iter); + auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, edgePrefix, &iter); if (code != kvstore::ResultCode::SUCCEEDED || !iter->valid()) { LOG(WARNING) << "Edge could not be found " << req.get_edge_name() << ", spaceId " << req.get_space_id() diff --git a/src/meta/processors/schemaMan/AlterTagProcessor.cpp b/src/meta/processors/schemaMan/AlterTagProcessor.cpp index 055f23e65e1..407346991be 100644 --- a/src/meta/processors/schemaMan/AlterTagProcessor.cpp +++ b/src/meta/processors/schemaMan/AlterTagProcessor.cpp @@ -24,7 +24,7 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) { // Check the tag belongs to the space std::unique_ptr iter; auto tagPrefix = MetaServiceUtils::schemaTagPrefix(req.get_space_id(), tagId); - auto code = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, tagPrefix, &iter); + auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, tagPrefix, &iter); if (code != kvstore::ResultCode::SUCCEEDED || !iter->valid()) { LOG(WARNING) << "Tag could not be found " << req.get_tag_name() << ", spaceId " << req.get_space_id() diff --git a/src/meta/processors/schemaMan/DropEdgeProcessor.cpp b/src/meta/processors/schemaMan/DropEdgeProcessor.cpp index 2e82341e02b..dd007f2e635 100644 --- a/src/meta/processors/schemaMan/DropEdgeProcessor.cpp +++ b/src/meta/processors/schemaMan/DropEdgeProcessor.cpp @@ -29,7 +29,7 @@ StatusOr> DropEdgeProcessor::getEdgeKeys(GraphSpaceID i std::vector keys; std::string edgeVal; EdgeType edgeType; - auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, indexKey, &edgeVal); + auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, indexKey, &edgeVal); if (ret == kvstore::ResultCode::SUCCEEDED) { edgeType = *reinterpret_cast(edgeVal.data()); resp_.set_id(to(edgeType, EntryType::EDGE)); @@ -40,7 +40,7 @@ StatusOr> DropEdgeProcessor::getEdgeKeys(GraphSpaceID i std::unique_ptr iter; auto key = MetaServiceUtils::schemaEdgePrefix(id, edgeType); - ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, key, &iter); + ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, key, &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { return Status::Error("Edge get error by id : %d !", edgeType); } diff --git a/src/meta/processors/schemaMan/DropTagProcessor.cpp b/src/meta/processors/schemaMan/DropTagProcessor.cpp index 71334562614..71db010e164 100644 --- a/src/meta/processors/schemaMan/DropTagProcessor.cpp +++ b/src/meta/processors/schemaMan/DropTagProcessor.cpp @@ -30,7 +30,7 @@ StatusOr> DropTagProcessor::getTagKeys(GraphSpaceID id, std::vector keys; std::string tagVal; TagID tagId; - auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, indexKey, &tagVal); + auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, indexKey, &tagVal); if (ret == kvstore::ResultCode::SUCCEEDED) { tagId = *reinterpret_cast(tagVal.data()); resp_.set_id(to(tagId, EntryType::TAG)); @@ -41,7 +41,7 @@ StatusOr> DropTagProcessor::getTagKeys(GraphSpaceID id, std::unique_ptr iter; auto key = MetaServiceUtils::schemaTagPrefix(id, tagId); - ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, key, &iter); + ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, key, &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { return Status::Error("Tag get error by id : %d !", tagId); } diff --git a/src/meta/processors/schemaMan/ListEdgesProcessor.cpp b/src/meta/processors/schemaMan/ListEdgesProcessor.cpp index 2af2896a453..293a86cd9c6 100644 --- a/src/meta/processors/schemaMan/ListEdgesProcessor.cpp +++ b/src/meta/processors/schemaMan/ListEdgesProcessor.cpp @@ -15,7 +15,7 @@ void ListEdgesProcessor::process(const cpp2::ListEdgesReq& req) { auto spaceId = req.get_space_id(); auto prefix = MetaServiceUtils::schemaEdgesPrefix(spaceId); std::unique_ptr iter; - auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); + auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); resp_.set_code(to(ret)); if (ret != kvstore::ResultCode::SUCCEEDED) { onFinished(); diff --git a/src/meta/processors/schemaMan/ListTagsProcessor.cpp b/src/meta/processors/schemaMan/ListTagsProcessor.cpp index af6daa01d73..0fe4b7d8129 100644 --- a/src/meta/processors/schemaMan/ListTagsProcessor.cpp +++ b/src/meta/processors/schemaMan/ListTagsProcessor.cpp @@ -15,7 +15,7 @@ void ListTagsProcessor::process(const cpp2::ListTagsReq& req) { auto spaceId = req.get_space_id(); auto prefix = MetaServiceUtils::schemaTagsPrefix(spaceId); std::unique_ptr iter; - auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); + auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); resp_.set_code(to(ret)); if (ret != kvstore::ResultCode::SUCCEEDED) { onFinished(); diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 14d31effc34..6b1dd211b61 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -122,7 +122,11 @@ add_executable( $ $ $ + $ $ + $ + $ + $ ) nebula_link_libraries( active_hosts_man_test diff --git a/src/meta/test/ProcessorTest.cpp b/src/meta/test/ProcessorTest.cpp index 0764d917501..a3e790121fe 100644 --- a/src/meta/test/ProcessorTest.cpp +++ b/src/meta/test/ProcessorTest.cpp @@ -44,8 +44,8 @@ using apache::thrift::FragileConstructor::FRAGILE; TEST(ProcessorTest, AddHostsTest) { fs::TempDir rootPath("/tmp/AddHostsTest.XXXXXX"); - FLAGS_expired_hosts_check_interval_sec = 1; - FLAGS_expired_threshold_sec = 1; + FLAGS_expired_hosts_check_interval_sec = 2; + FLAGS_expired_threshold_sec = 2; std::unique_ptr kv(TestUtils::initKV(rootPath.path())); { std::vector thriftHosts; @@ -171,7 +171,7 @@ TEST(ProcessorTest, ListHostsTest) { } { // host info expired - sleep(3); + sleep(FLAGS_expired_hosts_check_interval_sec + FLAGS_expired_threshold_sec); cpp2::ListHostsReq req; auto* processor = ListHostsProcessor::instance(kv.get()); auto f = processor->getFuture(); @@ -184,6 +184,7 @@ TEST(ProcessorTest, ListHostsTest) { ASSERT_EQ(cpp2::HostStatus::OFFLINE, resp.hosts[i].status); } } + ActiveHostsMan::instance()->stopClean(); } TEST(ProcessorTest, CreateSpaceTest) {