diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 7b772946cf3..86453afecc0 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -27,7 +27,7 @@ ActiveHostsMan::ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds, CHECK(checkThread_.start()); checkThread_.addTimerTask(intervalSeconds * 1000, intervalSeconds * 1000, - &ActiveHostsMan::cleanExpiredHosts, + &ActiveHostsMan::handleHosts, this); } @@ -77,7 +77,12 @@ std::vector ActiveHostsMan::getActiveHosts() { return hosts; } -void ActiveHostsMan::loadHostMap() { +void ActiveHostsMan::handleHosts() { + cleanExpiredHosts(); + mergeHosts(); +} + +void ActiveHostsMan::mergeHosts() { if (kvstore_ == nullptr) { return; } @@ -93,9 +98,13 @@ void ActiveHostsMan::loadHostMap() { auto host = MetaServiceUtils::parseHostKey(iter->key()); HostInfo info; info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds(); - if (iter->val() == MetaServiceUtils::hostValOnline()) { - LOG(INFO) << "load host " << host.ip << ":" << host.port; - updateHostInfo({host.ip, host.port}, info); + if (iter->val() == MetaServiceUtils::hostValKVStoreOnline()) { + folly::RWSpinLock::ReadHolder rh(&lock_); + bool found = hostsMap_.find({host.ip, host.port}) != hostsMap_.end(); + if (!found) { + LOG(INFO) << "add active host " << host.ip << ":" << host.port; + updateHostInfo({host.ip, host.port}, info); + } } iter->next(); } @@ -112,7 +121,7 @@ void ActiveHostsMan::cleanExpiredHosts() { LOG(INFO) << it->first << " expired! last hb time " << it->second.lastHBTimeInSec_; data.emplace_back(MetaServiceUtils::hostKey(it->first.first, it->first.second), - MetaServiceUtils::hostValOffline()); + MetaServiceUtils::hostValKVStoreOnline()); it = hostsMap_.erase(it); } else { it++; @@ -122,7 +131,7 @@ void ActiveHostsMan::cleanExpiredHosts() { if (!data.empty() && kvstore_ != nullptr) { folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); - LOG(INFO) << "set " << data.size() << " expired hosts to offline in meta rocksdb"; + LOG(INFO) << "set " << data.size() << " expired hosts to kvstoreOnline in meta rocksdb"; kvstore_->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), [] (kvstore::ResultCode code) { CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index f9aae295ab3..ada8960249e 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -37,6 +37,7 @@ struct HostInfo { class ActiveHostsMan final { FRIEND_TEST(ActiveHostsManTest, NormalTest); + FRIEND_TEST(ActiveHostsManTest, NormalTest2); FRIEND_TEST(ProcessorTest, ListHostsTest); public: @@ -44,8 +45,6 @@ class ActiveHostsMan final { static auto activeHostsMan = std::unique_ptr( new ActiveHostsMan(FLAGS_expired_hosts_check_interval_sec, FLAGS_expired_threshold_sec, kv)); - static std::once_flag initFlag; - std::call_once(initFlag, &ActiveHostsMan::loadHostMap, activeHostsMan.get()); return activeHostsMan.get(); } @@ -70,7 +69,8 @@ class ActiveHostsMan final { private: ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds, kvstore::KVStore* kv = nullptr); - void loadHostMap(); + void handleHosts(); + void mergeHosts(); void stopClean() { checkThread_.stop(); diff --git a/src/meta/MetaServiceUtils.cpp b/src/meta/MetaServiceUtils.cpp index 2ed8292d164..ffc550dfad7 100644 --- a/src/meta/MetaServiceUtils.cpp +++ b/src/meta/MetaServiceUtils.cpp @@ -21,6 +21,7 @@ const std::string kUsersTable = "__users__"; // NOLINT const std::string kRolesTable = "__roles__"; // NOLINT const std::string kHostOnline = "Online"; // NOLINT +const std::string kHostKVStoreOnline = "KVStoreOnline"; // NOLINT const std::string kHostOffline = "Offline"; // NOLINT std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) { @@ -112,6 +113,10 @@ std::string MetaServiceUtils::hostValOnline() { return kHostOnline; } +std::string MetaServiceUtils::hostValKVStoreOnline() { + return kHostKVStoreOnline; +} + std::string MetaServiceUtils::hostValOffline() { return kHostOffline; } diff --git a/src/meta/MetaServiceUtils.h b/src/meta/MetaServiceUtils.h index fc1bf664770..108a0a0cc3f 100644 --- a/src/meta/MetaServiceUtils.h +++ b/src/meta/MetaServiceUtils.h @@ -50,6 +50,8 @@ class MetaServiceUtils final { static std::string hostValOnline(); + static std::string hostValKVStoreOnline(); + static std::string hostValOffline(); static const std::string& hostPrefix(); diff --git a/src/meta/test/ActiveHostsManTest.cpp b/src/meta/test/ActiveHostsManTest.cpp index 4970569b76d..9fc765a28cc 100644 --- a/src/meta/test/ActiveHostsManTest.cpp +++ b/src/meta/test/ActiveHostsManTest.cpp @@ -37,6 +37,32 @@ TEST(ActiveHostsManTest, NormalTest) { } } +TEST(ActiveHostsManTest, NormalTest2) { + fs::TempDir rootPath("/tmp/ActiveHostsMergeTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + auto now = time::TimeUtils::nowInSeconds(); + ActiveHostsMan ahMan(1, 2, kv.get()); + ahMan.updateHostInfo(HostAddr(0, 0), HostInfo(now)); + ahMan.updateHostInfo(HostAddr(0, 1), HostInfo(now)); + ahMan.updateHostInfo(HostAddr(0, 2), HostInfo(now)); + ASSERT_EQ(3, ahMan.getActiveHosts().size()); + usleep(2200); + ASSERT_EQ(3, ahMan.getActiveHosts().size()); + { + std::vector data; + for (auto i = 0; i < 3; i++) { + data.emplace_back(MetaServiceUtils::hostKey(0, i), + MetaServiceUtils::hostValKVStoreOnline()); + } + kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), + [] (kvstore::ResultCode code) { + CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); + }); + } + usleep(200); + ASSERT_EQ(3, ahMan.getActiveHosts().size()); +} + } // namespace meta } // namespace nebula