Skip to content

Commit

Permalink
merge host info from kvstore (#550)
Browse files Browse the repository at this point in the history
* merge host info from kvstore

* add read lock

* found to notFound

* align fix
  • Loading branch information
andychow01 authored and dangleptr committed Jul 2, 2019
1 parent a8bdaef commit a6bbebe
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 0 deletions.
21 changes: 21 additions & 0 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,27 @@ void ActiveHostsMan::cleanExpiredHosts() {
}
}

// merge host info from kvstore
if (kvstore_ != nullptr) {
const auto& prefix = MetaServiceUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (ret == kvstore::ResultCode::SUCCEEDED) {
while (iter->valid()) {
auto host = MetaServiceUtils::parseHostKey(iter->key());
if (iter->val() == MetaServiceUtils::hostValOnline()) {
folly::RWSpinLock::ReadHolder rh(&lock_);
bool notFound = hostsMap_.find({host.ip, host.port}) == hostsMap_.end();
if (notFound) {
data.emplace_back(MetaServiceUtils::hostKey(host.ip, host.port),
MetaServiceUtils::hostValOffline());
}
}
iter->next();
}
}
}

if (!data.empty() && kvstore_ != nullptr) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
LOG(INFO) << "set " << data.size() << " expired hosts to offline in meta rocksdb";
Expand Down
1 change: 1 addition & 0 deletions src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct HostInfo {

class ActiveHostsMan final {
FRIEND_TEST(ActiveHostsManTest, NormalTest);
FRIEND_TEST(ActiveHostsManTest, MergeHostInfo);
FRIEND_TEST(ProcessorTest, ListHostsTest);

public:
Expand Down
50 changes: 50 additions & 0 deletions src/meta/test/ActiveHostsManTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,56 @@ TEST(ActiveHostsManTest, NormalTest) {
}
}

TEST(ActiveHostsManTest, MergeHostInfo) {
fs::TempDir rootPath("/tmp/ActiveHostsMergeTest.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv(TestUtils::initKV(rootPath.path()));
{
std::vector<kvstore::KV> data;
for (auto i = 0; i < 3; i++) {
data.emplace_back(MetaServiceUtils::hostKey(0, i),
MetaServiceUtils::hostValOnline());
}
kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data),
[] (kvstore::ResultCode code) {
CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED);
});
}

int onlineNum = 0;
int offlineNum = 0;
const auto& prefix = MetaServiceUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
while (iter->valid()) {
if (iter->val() == MetaServiceUtils::hostValOnline()) {
++onlineNum;
} else {
++offlineNum;
}
iter->next();
}
ASSERT_EQ(3, onlineNum);
ASSERT_EQ(0, offlineNum);

ActiveHostsMan ahMan(1, 1, kv.get());
sleep(ahMan.intervalSeconds_ + 1);
onlineNum = 0;
offlineNum = 0;
kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
while (iter->valid()) {
auto host = MetaServiceUtils::parseHostKey(iter->key());
LOG(INFO) << "ip: " << host.ip << ", port: " << host.port << ", status: " << iter->val();
if (iter->val() == MetaServiceUtils::hostValOnline()) {
++onlineNum;
} else {
++offlineNum;
}
iter->next();
}
ASSERT_EQ(0, onlineNum);
ASSERT_EQ(3, offlineNum);
}

} // namespace meta
} // namespace nebula

Expand Down

0 comments on commit a6bbebe

Please sign in to comment.