-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support displaying online hosts in "SHOW HOSTS" command and save hosts info in rocskdb #450
Changes from 3 commits
468c670
ea19b8e
703a17f
7185e28
6161705
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* Copyright (c) 2019 vesoft inc. All rights reserved. | ||
* | ||
* This source code is licensed under Apache 2.0 License, | ||
* attached with Common Clause Condition 1.0, found in the LICENSES directory. | ||
*/ | ||
|
||
#include "meta/ActiveHostsMan.h" | ||
#include "meta/MetaServiceUtils.h" | ||
#include "meta/processors/BaseProcessor.h" | ||
|
||
namespace nebula { | ||
namespace meta { | ||
|
||
ActiveHostsMan::ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds, | ||
kvstore::KVStore* kv) | ||
: intervalSeconds_(intervalSeconds) | ||
, expirationInSeconds_(expiredSeconds) { | ||
if (kv != nullptr) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you allow to pass in an empty kv? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. some ut which only use getActiveHosts pass in an empty kv |
||
kvstore_ = kv; | ||
} | ||
|
||
CHECK_GT(intervalSeconds, 0) | ||
<< "intervalSeconds " << intervalSeconds << " should > 0!"; | ||
CHECK_GE(expiredSeconds, intervalSeconds) | ||
<< "expiredSeconds " << expiredSeconds | ||
<< " should >= intervalSeconds " << intervalSeconds; | ||
CHECK(checkThread_.start()); | ||
checkThread_.addTimerTask(intervalSeconds * 1000, | ||
intervalSeconds * 1000, | ||
&ActiveHostsMan::cleanExpiredHosts, | ||
this); | ||
} | ||
|
||
void ActiveHostsMan::updateHostInfo(const HostAddr& hostAddr, const HostInfo& info) { | ||
std::vector<kvstore::KV> data; | ||
{ | ||
folly::RWSpinLock::ReadHolder rh(&lock_); | ||
auto it = hostsMap_.find(hostAddr); | ||
if (it == hostsMap_.end()) { | ||
folly::RWSpinLock::UpgradedHolder uh(&lock_); | ||
hostsMap_.emplace(hostAddr, std::move(info)); | ||
data.emplace_back(MetaServiceUtils::hostKey(hostAddr.first, hostAddr.second), | ||
MetaServiceUtils::hostValOnline()); | ||
} else { | ||
it->second.lastHBTimeInSec_ = info.lastHBTimeInSec_; | ||
} | ||
} | ||
if (kvstore_ != nullptr && !data.empty()) { | ||
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); | ||
kvstore_->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), | ||
[] (kvstore::ResultCode code) { | ||
CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); | ||
}); | ||
} | ||
} | ||
|
||
std::vector<HostAddr> ActiveHostsMan::getActiveHosts() { | ||
std::vector<HostAddr> hosts; | ||
folly::RWSpinLock::ReadHolder rh(&lock_); | ||
hosts.resize(hostsMap_.size()); | ||
std::transform(hostsMap_.begin(), hostsMap_.end(), hosts.begin(), | ||
[](const auto& entry) -> decltype(auto) { | ||
return entry.first; | ||
}); | ||
return hosts; | ||
} | ||
|
||
void ActiveHostsMan::loadHostMap() { | ||
if (kvstore_ == nullptr) { | ||
return; | ||
} | ||
|
||
const auto& prefix = MetaServiceUtils::hostPrefix(); | ||
std::unique_ptr<kvstore::KVIterator> iter; | ||
auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); | ||
if (ret != kvstore::ResultCode::SUCCEEDED) { | ||
return; | ||
} | ||
|
||
while (iter->valid()) { | ||
auto host = MetaServiceUtils::parseHostKey(iter->key()); | ||
HostInfo info; | ||
info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds(); | ||
if (iter->val() == MetaServiceUtils::hostValOnline()) { | ||
LOG(INFO) << "load host " << host.ip << ":" << host.port; | ||
updateHostInfo({host.ip, host.port}, info); | ||
} | ||
iter->next(); | ||
} | ||
} | ||
|
||
void ActiveHostsMan::cleanExpiredHosts() { | ||
int64_t now = time::TimeUtils::nowInSeconds(); | ||
std::vector<kvstore::KV> data; | ||
{ | ||
folly::RWSpinLock::WriteHolder rh(&lock_); | ||
auto it = hostsMap_.begin(); | ||
while (it != hostsMap_.end()) { | ||
if ((now - it->second.lastHBTimeInSec_) > expirationInSeconds_) { | ||
LOG(INFO) << it->first << " expired! last hb time " | ||
<< it->second.lastHBTimeInSec_; | ||
data.emplace_back(MetaServiceUtils::hostKey(it->first.first, it->first.second), | ||
MetaServiceUtils::hostValOffline()); | ||
it = hostsMap_.erase(it); | ||
} else { | ||
it++; | ||
} | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The lock scope should be finished here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch |
||
if (!data.empty() && kvstore_ != nullptr) { | ||
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); | ||
LOG(INFO) << "set " << data.size() << " expired hosts to offline in meta rocksdb"; | ||
kvstore_->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), | ||
[] (kvstore::ResultCode code) { | ||
CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); | ||
}); | ||
} | ||
} | ||
|
||
} // namespace meta | ||
} // namespace nebula |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better test one case with host offline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
roger that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SchemaTest has other ut which need to use active hosts. I add ut in meta/test/ProcessorTest.