From e25ab460fae7dd4e916314da17367596b161d54b Mon Sep 17 00:00:00 2001 From: andychow01 <8199095+andychow01@users.noreply.github.com> Date: Tue, 2 Jul 2019 12:08:47 +0800 Subject: [PATCH] merge host info from kvstore (#550) * merge host info from kvstore * add read lock * found to notFound * align fix --- src/meta/ActiveHostsMan.cpp | 21 ++++++++++++ src/meta/ActiveHostsMan.h | 1 + src/meta/test/ActiveHostsManTest.cpp | 50 ++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+) diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 901b024434a..8b64678f5b1 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -120,6 +120,27 @@ void ActiveHostsMan::cleanExpiredHosts() { } } + // merge host info from kvstore + if (kvstore_ != nullptr) { + const auto& prefix = MetaServiceUtils::hostPrefix(); + std::unique_ptr iter; + auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (ret == kvstore::ResultCode::SUCCEEDED) { + while (iter->valid()) { + auto host = MetaServiceUtils::parseHostKey(iter->key()); + if (iter->val() == MetaServiceUtils::hostValOnline()) { + folly::RWSpinLock::ReadHolder rh(&lock_); + bool notFound = hostsMap_.find({host.ip, host.port}) == hostsMap_.end(); + if (notFound) { + data.emplace_back(MetaServiceUtils::hostKey(host.ip, host.port), + MetaServiceUtils::hostValOffline()); + } + } + iter->next(); + } + } + } + if (!data.empty() && kvstore_ != nullptr) { folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); LOG(INFO) << "set " << data.size() << " expired hosts to offline in meta rocksdb"; diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index 95cc2a5c580..6d483d4734c 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -36,6 +36,7 @@ struct HostInfo { class ActiveHostsMan final { FRIEND_TEST(ActiveHostsManTest, NormalTest); + FRIEND_TEST(ActiveHostsManTest, MergeHostInfo); FRIEND_TEST(ProcessorTest, ListHostsTest); public: diff --git a/src/meta/test/ActiveHostsManTest.cpp b/src/meta/test/ActiveHostsManTest.cpp index 4e2cd53afc4..c6980e09f2f 100644 --- a/src/meta/test/ActiveHostsManTest.cpp +++ b/src/meta/test/ActiveHostsManTest.cpp @@ -37,6 +37,56 @@ TEST(ActiveHostsManTest, NormalTest) { } } +TEST(ActiveHostsManTest, MergeHostInfo) { + fs::TempDir rootPath("/tmp/ActiveHostsMergeTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + { + std::vector data; + for (auto i = 0; i < 3; i++) { + data.emplace_back(MetaServiceUtils::hostKey(0, i), + MetaServiceUtils::hostValOnline()); + } + kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), + [] (kvstore::ResultCode code) { + CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); + }); + } + + int onlineNum = 0; + int offlineNum = 0; + const auto& prefix = MetaServiceUtils::hostPrefix(); + std::unique_ptr iter; + kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + while (iter->valid()) { + if (iter->val() == MetaServiceUtils::hostValOnline()) { + ++onlineNum; + } else { + ++offlineNum; + } + iter->next(); + } + ASSERT_EQ(3, onlineNum); + ASSERT_EQ(0, offlineNum); + + ActiveHostsMan ahMan(1, 1, kv.get()); + sleep(ahMan.intervalSeconds_ + 1); + onlineNum = 0; + offlineNum = 0; + kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + while (iter->valid()) { + auto host = MetaServiceUtils::parseHostKey(iter->key()); + LOG(INFO) << "ip: " << host.ip << ", port: " << host.port << ", status: " << iter->val(); + if (iter->val() == MetaServiceUtils::hostValOnline()) { + ++onlineNum; + } else { + ++offlineNum; + } + iter->next(); + } + ASSERT_EQ(0, onlineNum); + ASSERT_EQ(3, offlineNum); +} + } // namespace meta } // namespace nebula