diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h new file mode 100644 index 00000000000..e2a83559e97 --- /dev/null +++ b/src/meta/ActiveHostsMan.h @@ -0,0 +1,101 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_ACTIVEHOSTSMAN_H_ +#define META_ACTIVEHOSTSMAN_H_ + +#include "base/Base.h" +#include +#include "thread/GenericWorker.h" +#include "time/TimeUtils.h" + +namespace nebula { +namespace meta { + +struct HostInfo { + HostInfo() = default; + explicit HostInfo(int64_t lastHBTimeInSec) + : lastHBTimeInSec_(lastHBTimeInSec) {} + + bool operator==(const HostInfo& that) const { + return this->lastHBTimeInSec_ == that.lastHBTimeInSec_; + } + + bool operator!=(const HostInfo& that) const { + return !(*this == that); + } + + int64_t lastHBTimeInSec_ = 0; +}; + +class ActiveHostsMan final { + FRIEND_TEST(ActiveHostsManTest, NormalTest); + +public: + ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds) + : intervalSeconds_(intervalSeconds) + , expirationInSeconds_(expiredSeconds) { + CHECK(checkThread_.start()); + checkThread_.addTimerTask(intervalSeconds * 1000, + intervalSeconds * 1000, + &ActiveHostsMan::cleanExpiredHosts, + this); + } + + ~ActiveHostsMan() { + checkThread_.stop(); + checkThread_.wait(); + } + + void updateHostInfo(HostAddr hostAddr, HostInfo info) { + folly::RWSpinLock::ReadHolder rh(&lock_); + auto it = hostsMap_.find(hostAddr); + if (it == hostsMap_.end()) { + folly::RWSpinLock::UpgradedHolder uh(&lock_); + hostsMap_.emplace(std::move(hostAddr), std::move(info)); + } else { + it->second.lastHBTimeInSec_ = info.lastHBTimeInSec_; + } + } + + std::vector getActiveHosts() { + std::vector hosts; + folly::RWSpinLock::ReadHolder rh(&lock_); + hosts.resize(hostsMap_.size()); + std::transform(hostsMap_.begin(), hostsMap_.end(), hosts.begin(), + [](const auto& entry) -> decltype(auto) { + return entry.first; + }); + return hosts; + } + +protected: + void cleanExpiredHosts() { + int64_t now = time::TimeUtils::nowInSeconds(); + folly::RWSpinLock::WriteHolder rh(&lock_); + auto it = hostsMap_.begin(); + while (it != hostsMap_.end()) { + if ((now - it->second.lastHBTimeInSec_) > expirationInSeconds_) { + LOG(INFO) << it->first << " expired! last hb time " + << it->second.lastHBTimeInSec_; + it = hostsMap_.erase(it); + } else { + it++; + } + } + } + +private: + folly::RWSpinLock lock_; + std::unordered_map hostsMap_; + thread::GenericWorker checkThread_; + int32_t intervalSeconds_ = 0; + int32_t expirationInSeconds_ = 5 * 60; +}; +} // namespace meta +} // namespace nebula + +#endif // META_ACTIVEHOSTSMAN_H_ diff --git a/src/meta/test/ActiveHostsManTest.cpp b/src/meta/test/ActiveHostsManTest.cpp new file mode 100644 index 00000000000..e82e2c29204 --- /dev/null +++ b/src/meta/test/ActiveHostsManTest.cpp @@ -0,0 +1,47 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include "base/Base.h" +#include +#include "meta/ActiveHostsMan.h" + +namespace nebula { +namespace meta { + +TEST(ActiveHostsManTest, NormalTest) { + auto now = time::TimeUtils::nowInSeconds(); + ActiveHostsMan ahMan(1, 1); + ahMan.updateHostInfo(HostAddr(0, 0), HostInfo(now)); + ahMan.updateHostInfo(HostAddr(0, 1), HostInfo(now)); + ahMan.updateHostInfo(HostAddr(0, 2), HostInfo(now)); + + ASSERT_EQ(3, ahMan.getActiveHosts().size()); + ahMan.updateHostInfo(HostAddr(0, 0), HostInfo(now + 2)); + ASSERT_EQ(3, ahMan.getActiveHosts().size()); + { + folly::RWSpinLock::ReadHolder rh(&ahMan.lock_); + ASSERT_EQ(HostInfo(now + 2), ahMan.hostsMap_[HostAddr(0, 0)]); + } + + sleep(3); + ASSERT_EQ(1, ahMan.getActiveHosts().size()); + { + folly::RWSpinLock::ReadHolder rh(&ahMan.lock_); + ASSERT_EQ(HostInfo(now + 2), ahMan.hostsMap_[HostAddr(0, 0)]); + } +} + +} // namespace meta +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 6ccf3dfb8bc..20c91288845 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -92,3 +92,19 @@ nebula_link_libraries( nebula_add_test(meta_client_test) +add_executable( + active_hosts_man_test + ActiveHostsManTest.cpp + $ + $ + $ + $ + $ +) +nebula_link_libraries( + active_hosts_man_test + gtest +) + +nebula_add_test(active_hosts_man_test) +