From 649a828da1996dfa504d53de34a687661a8382ff Mon Sep 17 00:00:00 2001 From: heng Date: Thu, 25 Apr 2019 10:42:46 +0800 Subject: [PATCH 1/2] Add active hosts manager for balance in the future. --- src/meta/ActiveHostsMan.h | 100 +++++++++++++++++++++++++++ src/meta/test/ActiveHostsManTest.cpp | 41 +++++++++++ src/meta/test/CMakeLists.txt | 16 +++++ 3 files changed, 157 insertions(+) create mode 100644 src/meta/ActiveHostsMan.h create mode 100644 src/meta/test/ActiveHostsManTest.cpp diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h new file mode 100644 index 00000000000..6a1f2a7e6c9 --- /dev/null +++ b/src/meta/ActiveHostsMan.h @@ -0,0 +1,100 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_ACTIVEHOSTSMAN_H_ +#define META_ACTIVEHOSTSMAN_H_ + +#include "base/Base.h" +#include +#include "thread/GenericWorker.h" +#include "time/TimeUtils.h" + +namespace nebula { +namespace meta { + +struct HostInfo { + HostInfo() = default; + explicit HostInfo(int64_t lastHeartBeatTimeSeconds) + : lastHeartBeatTimeSeconds_(lastHeartBeatTimeSeconds) {} + + bool operator==(const HostInfo& that) const { + return this->lastHeartBeatTimeSeconds_ == that.lastHeartBeatTimeSeconds_; + } + + bool operator!=(const HostInfo& that) const { + return !(*this == that); + } + + int64_t lastHeartBeatTimeSeconds_ = 0; +}; + +class ActiveHostsMan final { + FRIEND_TEST(ActiveHostsManTest, NormalTest); + +public: + ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds) + : intervalSeconds_(intervalSeconds) + , expiredSeconds_(expiredSeconds) { + CHECK(checkThread_.start()); + checkThread_.addTimerTask(intervalSeconds * 1000, + intervalSeconds * 1000, + &ActiveHostsMan::cleanExpiredHosts, this); + } + + ~ActiveHostsMan() { + checkThread_.stop(); + checkThread_.wait(); + } + + void updateHostInfo(HostAddr hostAddr, HostInfo info) { + folly::RWSpinLock::ReadHolder rh(&lock_); + auto it = hostsMap_.find(hostAddr); + if (it == hostsMap_.end()) { + folly::RWSpinLock::UpgradedHolder uh(&lock_); + hostsMap_.emplace(std::move(hostAddr), std::move(info)); + } else { + it->second.lastHeartBeatTimeSeconds_ = info.lastHeartBeatTimeSeconds_; + } + } + + std::vector getActiveHosts() { + std::vector hosts; + folly::RWSpinLock::ReadHolder rh(&lock_); + hosts.resize(hostsMap_.size()); + std::transform(hostsMap_.begin(), hostsMap_.end(), hosts.begin(), + [](const auto& entry) -> decltype(auto) { + return entry.first; + }); + return hosts; + } + +protected: + void cleanExpiredHosts() { + int64_t now = time::TimeUtils::nowInSeconds(); + folly::RWSpinLock::WriteHolder rh(&lock_); + auto it = hostsMap_.begin(); + while (it != hostsMap_.end()) { + if ((now - it->second.lastHeartBeatTimeSeconds_) > expiredSeconds_) { + LOG(INFO) << it->first << " expired! last hb time " + << it->second.lastHeartBeatTimeSeconds_; + it = hostsMap_.erase(it); + } else { + it++; + } + } + } + +private: + folly::RWSpinLock lock_; + std::unordered_map hostsMap_; + thread::GenericWorker checkThread_; + int32_t intervalSeconds_ = 0; + int32_t expiredSeconds_ = 5 * 60; +}; +} // namespace meta +} // namespace nebula + +#endif // META_ACTIVEHOSTSMAN_H_ diff --git a/src/meta/test/ActiveHostsManTest.cpp b/src/meta/test/ActiveHostsManTest.cpp new file mode 100644 index 00000000000..673b5870d05 --- /dev/null +++ b/src/meta/test/ActiveHostsManTest.cpp @@ -0,0 +1,41 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include "base/Base.h" +#include +#include "meta/ActiveHostsMan.h" + +namespace nebula { +namespace meta { + +TEST(ActiveHostsManTest, NormalTest) { + auto now = time::TimeUtils::nowInSeconds(); + ActiveHostsMan ahMan(1, 1); + ahMan.updateHostInfo(HostAddr(0, 0), HostInfo(now)); + ahMan.updateHostInfo(HostAddr(0, 1), HostInfo(now)); + ahMan.updateHostInfo(HostAddr(0, 2), HostInfo(now)); + + ASSERT_EQ(3, ahMan.getActiveHosts().size()); + ahMan.updateHostInfo(HostAddr(0, 0), HostInfo(now + 2)); + ASSERT_EQ(3, ahMan.getActiveHosts().size()); + ASSERT_EQ(HostInfo(now + 2), ahMan.hostsMap_[HostAddr(0, 0)]); + + sleep(3); + ASSERT_EQ(1, ahMan.getActiveHosts().size()); + ASSERT_EQ(HostInfo(now + 2), ahMan.hostsMap_[HostAddr(0, 0)]); +} + +} // namespace meta +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index addacadbf47..5ac3bec303f 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -90,3 +90,19 @@ nebula_link_libraries( nebula_add_test(meta_client_test) +add_executable( + active_hosts_man_test + ActiveHostsManTest.cpp + $ + $ + $ + $ + $ +) +nebula_link_libraries( + active_hosts_man_test + gtest +) + +nebula_add_test(active_hosts_man_test) + From aad12f69a1c4bab7590a88a4d5fbbeb7555c0f2f Mon Sep 17 00:00:00 2001 From: heng Date: Sun, 28 Apr 2019 14:11:46 +0800 Subject: [PATCH 2/2] Address dutor's and sherman's comments --- src/meta/ActiveHostsMan.h | 21 +++++++++++---------- src/meta/test/ActiveHostsManTest.cpp | 10 ++++++++-- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index 6a1f2a7e6c9..e2a83559e97 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -17,18 +17,18 @@ namespace meta { struct HostInfo { HostInfo() = default; - explicit HostInfo(int64_t lastHeartBeatTimeSeconds) - : lastHeartBeatTimeSeconds_(lastHeartBeatTimeSeconds) {} + explicit HostInfo(int64_t lastHBTimeInSec) + : lastHBTimeInSec_(lastHBTimeInSec) {} bool operator==(const HostInfo& that) const { - return this->lastHeartBeatTimeSeconds_ == that.lastHeartBeatTimeSeconds_; + return this->lastHBTimeInSec_ == that.lastHBTimeInSec_; } bool operator!=(const HostInfo& that) const { return !(*this == that); } - int64_t lastHeartBeatTimeSeconds_ = 0; + int64_t lastHBTimeInSec_ = 0; }; class ActiveHostsMan final { @@ -37,11 +37,12 @@ class ActiveHostsMan final { public: ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds) : intervalSeconds_(intervalSeconds) - , expiredSeconds_(expiredSeconds) { + , expirationInSeconds_(expiredSeconds) { CHECK(checkThread_.start()); checkThread_.addTimerTask(intervalSeconds * 1000, intervalSeconds * 1000, - &ActiveHostsMan::cleanExpiredHosts, this); + &ActiveHostsMan::cleanExpiredHosts, + this); } ~ActiveHostsMan() { @@ -56,7 +57,7 @@ class ActiveHostsMan final { folly::RWSpinLock::UpgradedHolder uh(&lock_); hostsMap_.emplace(std::move(hostAddr), std::move(info)); } else { - it->second.lastHeartBeatTimeSeconds_ = info.lastHeartBeatTimeSeconds_; + it->second.lastHBTimeInSec_ = info.lastHBTimeInSec_; } } @@ -77,9 +78,9 @@ class ActiveHostsMan final { folly::RWSpinLock::WriteHolder rh(&lock_); auto it = hostsMap_.begin(); while (it != hostsMap_.end()) { - if ((now - it->second.lastHeartBeatTimeSeconds_) > expiredSeconds_) { + if ((now - it->second.lastHBTimeInSec_) > expirationInSeconds_) { LOG(INFO) << it->first << " expired! last hb time " - << it->second.lastHeartBeatTimeSeconds_; + << it->second.lastHBTimeInSec_; it = hostsMap_.erase(it); } else { it++; @@ -92,7 +93,7 @@ class ActiveHostsMan final { std::unordered_map hostsMap_; thread::GenericWorker checkThread_; int32_t intervalSeconds_ = 0; - int32_t expiredSeconds_ = 5 * 60; + int32_t expirationInSeconds_ = 5 * 60; }; } // namespace meta } // namespace nebula diff --git a/src/meta/test/ActiveHostsManTest.cpp b/src/meta/test/ActiveHostsManTest.cpp index 673b5870d05..e82e2c29204 100644 --- a/src/meta/test/ActiveHostsManTest.cpp +++ b/src/meta/test/ActiveHostsManTest.cpp @@ -20,11 +20,17 @@ TEST(ActiveHostsManTest, NormalTest) { ASSERT_EQ(3, ahMan.getActiveHosts().size()); ahMan.updateHostInfo(HostAddr(0, 0), HostInfo(now + 2)); ASSERT_EQ(3, ahMan.getActiveHosts().size()); - ASSERT_EQ(HostInfo(now + 2), ahMan.hostsMap_[HostAddr(0, 0)]); + { + folly::RWSpinLock::ReadHolder rh(&ahMan.lock_); + ASSERT_EQ(HostInfo(now + 2), ahMan.hostsMap_[HostAddr(0, 0)]); + } sleep(3); ASSERT_EQ(1, ahMan.getActiveHosts().size()); - ASSERT_EQ(HostInfo(now + 2), ahMan.hostsMap_[HostAddr(0, 0)]); + { + folly::RWSpinLock::ReadHolder rh(&ahMan.lock_); + ASSERT_EQ(HostInfo(now + 2), ahMan.hostsMap_[HostAddr(0, 0)]); + } } } // namespace meta