From fe41d412e796ccf82dd96f6f17ea9df0b47a4459 Mon Sep 17 00:00:00 2001 From: andychow01 Date: Tue, 25 Jun 2019 16:02:53 +0800 Subject: [PATCH 1/2] merge host info --- src/meta/ActiveHostsMan.cpp | 16 ++++++-- src/meta/ActiveHostsMan.h | 6 +-- src/meta/test/ActiveHostsMergeTest.cpp | 54 ++++++++++++++++++++++++++ src/meta/test/CMakeLists.txt | 30 ++++++++++++++ 4 files changed, 99 insertions(+), 7 deletions(-) create mode 100644 src/meta/test/ActiveHostsMergeTest.cpp diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 7b772946cf3..02f4947cb32 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -27,7 +27,7 @@ ActiveHostsMan::ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds, CHECK(checkThread_.start()); checkThread_.addTimerTask(intervalSeconds * 1000, intervalSeconds * 1000, - &ActiveHostsMan::cleanExpiredHosts, + &ActiveHostsMan::handleHosts, this); } @@ -77,7 +77,12 @@ std::vector ActiveHostsMan::getActiveHosts() { return hosts; } -void ActiveHostsMan::loadHostMap() { +void ActiveHostsMan::handleHosts() { + cleanExpiredHosts(); + mergeHosts(); +} + +void ActiveHostsMan::mergeHosts() { if (kvstore_ == nullptr) { return; } @@ -94,8 +99,11 @@ void ActiveHostsMan::loadHostMap() { HostInfo info; info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds(); if (iter->val() == MetaServiceUtils::hostValOnline()) { - LOG(INFO) << "load host " << host.ip << ":" << host.port; - updateHostInfo({host.ip, host.port}, info); + bool found = hostsMap_.find({host.ip, host.port}) != hostsMap_.end(); + if (!found) { + LOG(INFO) << "add active host " << host.ip << ":" << host.port; + updateHostInfo({host.ip, host.port}, info); + } } iter->next(); } diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index f9aae295ab3..ce3f5ba30c2 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -37,6 +37,7 @@ struct HostInfo { class ActiveHostsMan final { FRIEND_TEST(ActiveHostsManTest, NormalTest); + FRIEND_TEST(ActiveHostsMergeTest, NormalTest); FRIEND_TEST(ProcessorTest, ListHostsTest); public: @@ -44,8 +45,6 @@ class ActiveHostsMan final { static auto activeHostsMan = std::unique_ptr( new ActiveHostsMan(FLAGS_expired_hosts_check_interval_sec, FLAGS_expired_threshold_sec, kv)); - static std::once_flag initFlag; - std::call_once(initFlag, &ActiveHostsMan::loadHostMap, activeHostsMan.get()); return activeHostsMan.get(); } @@ -70,7 +69,8 @@ class ActiveHostsMan final { private: ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds, kvstore::KVStore* kv = nullptr); - void loadHostMap(); + void handleHosts(); + void mergeHosts(); void stopClean() { checkThread_.stop(); diff --git a/src/meta/test/ActiveHostsMergeTest.cpp b/src/meta/test/ActiveHostsMergeTest.cpp new file mode 100644 index 00000000000..c881e673612 --- /dev/null +++ b/src/meta/test/ActiveHostsMergeTest.cpp @@ -0,0 +1,54 @@ + +/* Copyright (c) 2018 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ +#include "base/Base.h" +#include +#include "meta/ActiveHostsMan.h" +#include "fs/TempDir.h" +#include "meta/test/TestUtils.h" + +namespace nebula { +namespace meta { + +TEST(ActiveHostsMergeTest, NormalTest) { + fs::TempDir rootPath("/tmp/ActiveHostsMergeTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + auto now = time::TimeUtils::nowInSeconds(); + ActiveHostsMan ahMan(1, 2, kv.get()); + ahMan.updateHostInfo(HostAddr(0, 0), HostInfo(now)); + ahMan.updateHostInfo(HostAddr(0, 1), HostInfo(now)); + ahMan.updateHostInfo(HostAddr(0, 2), HostInfo(now)); + ASSERT_EQ(3, ahMan.getActiveHosts().size()); + sleep(4); + ASSERT_EQ(0, ahMan.getActiveHosts().size()); + { + std::vector data; + for (auto i = 0; i < 3; i++) { + data.emplace_back(MetaServiceUtils::hostKey(i, i), + MetaServiceUtils::hostValOnline()); + } + kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), + [] (kvstore::ResultCode code) { + CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); + }); + } + sleep(2); + ASSERT_EQ(3, ahMan.getActiveHosts().size()); +} + +} // namespace meta +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + + diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 77061e4f336..1114503ccc0 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -138,6 +138,36 @@ nebula_link_libraries( nebula_add_test(active_hosts_man_test) +add_executable( + active_hosts_merge_test + ActiveHostsMergeTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +nebula_link_libraries( + active_hosts_merge_test + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + gtest +) +nebula_add_test(active_hosts_merge_test) + + add_executable( meta_http_test MetaHttpHandlerTest.cpp From e62097b95e8a7c32038b2824830485f0e17a6105 Mon Sep 17 00:00:00 2001 From: andychow01 <8199095+andychow01@users.noreply.github.com> Date: Wed, 26 Jun 2019 15:34:17 +0800 Subject: [PATCH 2/2] merge host infos from kvstore. --- src/meta/ActiveHostsMan.cpp | 7 ++-- src/meta/ActiveHostsMan.h | 2 +- src/meta/MetaServiceUtils.cpp | 5 +++ src/meta/MetaServiceUtils.h | 2 + src/meta/test/ActiveHostsManTest.cpp | 26 +++++++++++++ src/meta/test/ActiveHostsMergeTest.cpp | 54 -------------------------- src/meta/test/CMakeLists.txt | 30 -------------- 7 files changed, 38 insertions(+), 88 deletions(-) delete mode 100644 src/meta/test/ActiveHostsMergeTest.cpp diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 02f4947cb32..86453afecc0 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -98,7 +98,8 @@ void ActiveHostsMan::mergeHosts() { auto host = MetaServiceUtils::parseHostKey(iter->key()); HostInfo info; info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds(); - if (iter->val() == MetaServiceUtils::hostValOnline()) { + if (iter->val() == MetaServiceUtils::hostValKVStoreOnline()) { + folly::RWSpinLock::ReadHolder rh(&lock_); bool found = hostsMap_.find({host.ip, host.port}) != hostsMap_.end(); if (!found) { LOG(INFO) << "add active host " << host.ip << ":" << host.port; @@ -120,7 +121,7 @@ void ActiveHostsMan::cleanExpiredHosts() { LOG(INFO) << it->first << " expired! last hb time " << it->second.lastHBTimeInSec_; data.emplace_back(MetaServiceUtils::hostKey(it->first.first, it->first.second), - MetaServiceUtils::hostValOffline()); + MetaServiceUtils::hostValKVStoreOnline()); it = hostsMap_.erase(it); } else { it++; @@ -130,7 +131,7 @@ void ActiveHostsMan::cleanExpiredHosts() { if (!data.empty() && kvstore_ != nullptr) { folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); - LOG(INFO) << "set " << data.size() << " expired hosts to offline in meta rocksdb"; + LOG(INFO) << "set " << data.size() << " expired hosts to kvstoreOnline in meta rocksdb"; kvstore_->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), [] (kvstore::ResultCode code) { CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index ce3f5ba30c2..ada8960249e 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -37,7 +37,7 @@ struct HostInfo { class ActiveHostsMan final { FRIEND_TEST(ActiveHostsManTest, NormalTest); - FRIEND_TEST(ActiveHostsMergeTest, NormalTest); + FRIEND_TEST(ActiveHostsManTest, NormalTest2); FRIEND_TEST(ProcessorTest, ListHostsTest); public: diff --git a/src/meta/MetaServiceUtils.cpp b/src/meta/MetaServiceUtils.cpp index 2ed8292d164..ffc550dfad7 100644 --- a/src/meta/MetaServiceUtils.cpp +++ b/src/meta/MetaServiceUtils.cpp @@ -21,6 +21,7 @@ const std::string kUsersTable = "__users__"; // NOLINT const std::string kRolesTable = "__roles__"; // NOLINT const std::string kHostOnline = "Online"; // NOLINT +const std::string kHostKVStoreOnline = "KVStoreOnline"; // NOLINT const std::string kHostOffline = "Offline"; // NOLINT std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) { @@ -112,6 +113,10 @@ std::string MetaServiceUtils::hostValOnline() { return kHostOnline; } +std::string MetaServiceUtils::hostValKVStoreOnline() { + return kHostKVStoreOnline; +} + std::string MetaServiceUtils::hostValOffline() { return kHostOffline; } diff --git a/src/meta/MetaServiceUtils.h b/src/meta/MetaServiceUtils.h index fc1bf664770..108a0a0cc3f 100644 --- a/src/meta/MetaServiceUtils.h +++ b/src/meta/MetaServiceUtils.h @@ -50,6 +50,8 @@ class MetaServiceUtils final { static std::string hostValOnline(); + static std::string hostValKVStoreOnline(); + static std::string hostValOffline(); static const std::string& hostPrefix(); diff --git a/src/meta/test/ActiveHostsManTest.cpp b/src/meta/test/ActiveHostsManTest.cpp index 4970569b76d..9fc765a28cc 100644 --- a/src/meta/test/ActiveHostsManTest.cpp +++ b/src/meta/test/ActiveHostsManTest.cpp @@ -37,6 +37,32 @@ TEST(ActiveHostsManTest, NormalTest) { } } +TEST(ActiveHostsManTest, NormalTest2) { + fs::TempDir rootPath("/tmp/ActiveHostsMergeTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + auto now = time::TimeUtils::nowInSeconds(); + ActiveHostsMan ahMan(1, 2, kv.get()); + ahMan.updateHostInfo(HostAddr(0, 0), HostInfo(now)); + ahMan.updateHostInfo(HostAddr(0, 1), HostInfo(now)); + ahMan.updateHostInfo(HostAddr(0, 2), HostInfo(now)); + ASSERT_EQ(3, ahMan.getActiveHosts().size()); + usleep(2200); + ASSERT_EQ(3, ahMan.getActiveHosts().size()); + { + std::vector data; + for (auto i = 0; i < 3; i++) { + data.emplace_back(MetaServiceUtils::hostKey(0, i), + MetaServiceUtils::hostValKVStoreOnline()); + } + kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), + [] (kvstore::ResultCode code) { + CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); + }); + } + usleep(200); + ASSERT_EQ(3, ahMan.getActiveHosts().size()); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/test/ActiveHostsMergeTest.cpp b/src/meta/test/ActiveHostsMergeTest.cpp deleted file mode 100644 index c881e673612..00000000000 --- a/src/meta/test/ActiveHostsMergeTest.cpp +++ /dev/null @@ -1,54 +0,0 @@ - -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ -#include "base/Base.h" -#include -#include "meta/ActiveHostsMan.h" -#include "fs/TempDir.h" -#include "meta/test/TestUtils.h" - -namespace nebula { -namespace meta { - -TEST(ActiveHostsMergeTest, NormalTest) { - fs::TempDir rootPath("/tmp/ActiveHostsMergeTest.XXXXXX"); - std::unique_ptr kv(TestUtils::initKV(rootPath.path())); - auto now = time::TimeUtils::nowInSeconds(); - ActiveHostsMan ahMan(1, 2, kv.get()); - ahMan.updateHostInfo(HostAddr(0, 0), HostInfo(now)); - ahMan.updateHostInfo(HostAddr(0, 1), HostInfo(now)); - ahMan.updateHostInfo(HostAddr(0, 2), HostInfo(now)); - ASSERT_EQ(3, ahMan.getActiveHosts().size()); - sleep(4); - ASSERT_EQ(0, ahMan.getActiveHosts().size()); - { - std::vector data; - for (auto i = 0; i < 3; i++) { - data.emplace_back(MetaServiceUtils::hostKey(i, i), - MetaServiceUtils::hostValOnline()); - } - kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), - [] (kvstore::ResultCode code) { - CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); - }); - } - sleep(2); - ASSERT_EQ(3, ahMan.getActiveHosts().size()); -} - -} // namespace meta -} // namespace nebula - - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - return RUN_ALL_TESTS(); -} - - - diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 1114503ccc0..77061e4f336 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -138,36 +138,6 @@ nebula_link_libraries( nebula_add_test(active_hosts_man_test) -add_executable( - active_hosts_merge_test - ActiveHostsMergeTest.cpp - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ - $ -) -nebula_link_libraries( - active_hosts_merge_test - ${ROCKSDB_LIBRARIES} - ${THRIFT_LIBRARIES} - wangle - gtest -) -nebula_add_test(active_hosts_merge_test) - - add_executable( meta_http_test MetaHttpHandlerTest.cpp