Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support displaying online hosts in "SHOW HOSTS" command and save hosts info in rocskdb #450

Merged
merged 5 commits into from
Jun 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "thread/GenericThreadPool.h"
#include "kvstore/PartManager.h"
#include "kvstore/NebulaStore.h"
#include "meta/ActiveHostsMan.h"

using nebula::ProcessUtils;
using nebula::Status;
Expand Down Expand Up @@ -129,6 +130,7 @@ int main(int argc, char *argv[]) {
localhost);

auto handler = std::make_shared<nebula::meta::MetaServiceHandler>(kvstore.get());
nebula::meta::ActiveHostsMan::instance(kvstore.get());

nebula::operator<<(operator<<(LOG(INFO), "The meta deamon start on "), localhost);
try {
Expand Down
10 changes: 6 additions & 4 deletions src/graph/ShowExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ void ShowExecutor::showHosts() {

header.push_back("Ip");
header.push_back("Port");
header.push_back("Status");
resp_->set_column_names(std::move(header));

for (auto &host : retShowHosts) {
for (auto &status : retShowHosts) {
std::vector<cpp2::ColumnValue> row;
row.resize(2);
row[0].set_str(NetworkUtils::ipFromHostAddr(host));
row[1].set_str(folly::to<std::string>(NetworkUtils::portFromHostAddr(host)));
row.resize(3);
row[0].set_str(NetworkUtils::ipFromHostAddr(status.first));
row[1].set_str(folly::to<std::string>(NetworkUtils::portFromHostAddr(status.first)));
row[2].set_str(status.second);
rows.emplace_back();
rows.back().set_columns(std::move(row));
}
Expand Down
6 changes: 3 additions & 3 deletions src/graph/test/SchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ TEST_F(SchemaTest, metaCommunication) {
cpp2::ExecutionResponse resp;
std::string query = "SHOW HOSTS";
client->execute(query, resp);
std::vector<uniform_tuple_t<std::string, 2>> expected{
{"127.0.0.1", "1000"},
{"127.0.0.1", "1100"},
std::vector<uniform_tuple_t<std::string, 3>> expected{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better test one case with host offline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roger that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SchemaTest has other ut which need to use active hosts. I add ut in meta/test/ProcessorTest.

{"127.0.0.1", "1000", "offline"},
{"127.0.0.1", "1100", "offline"},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
Expand Down
13 changes: 12 additions & 1 deletion src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ struct EdgeItem {
4: common.Schema schema,
}

enum HostStatus {
ONLINE = 0x00,
OFFLINE = 0x01,
UNKNOWN = 0x02,
} (cpp.enum_strict)

struct HostItem {
1: common.HostAddr hostAddr,
2: HostStatus status,
}

struct ExecResp {
1: ErrorCode code,
// For custom kv operations, it is useless.
Expand Down Expand Up @@ -220,7 +231,7 @@ struct ListHostsResp {
1: ErrorCode code,
// Valid if ret equals E_LEADER_CHANGED.
2: common.HostAddr leader,
3: list<common.HostAddr> hosts,
3: list<HostItem> hosts,
}

struct RemoveHostsReq {
Expand Down
122 changes: 122 additions & 0 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "meta/ActiveHostsMan.h"
#include "meta/MetaServiceUtils.h"
#include "meta/processors/BaseProcessor.h"

namespace nebula {
namespace meta {

ActiveHostsMan::ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds,
kvstore::KVStore* kv)
: intervalSeconds_(intervalSeconds)
, expirationInSeconds_(expiredSeconds) {
if (kv != nullptr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you allow to pass in an empty kv?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some ut which only use getActiveHosts pass in an empty kv

kvstore_ = kv;
}

CHECK_GT(intervalSeconds, 0)
<< "intervalSeconds " << intervalSeconds << " should > 0!";
CHECK_GE(expiredSeconds, intervalSeconds)
<< "expiredSeconds " << expiredSeconds
<< " should >= intervalSeconds " << intervalSeconds;
CHECK(checkThread_.start());
checkThread_.addTimerTask(intervalSeconds * 1000,
intervalSeconds * 1000,
&ActiveHostsMan::cleanExpiredHosts,
this);
}

void ActiveHostsMan::updateHostInfo(const HostAddr& hostAddr, const HostInfo& info) {
std::vector<kvstore::KV> data;
{
folly::RWSpinLock::ReadHolder rh(&lock_);
auto it = hostsMap_.find(hostAddr);
if (it == hostsMap_.end()) {
folly::RWSpinLock::UpgradedHolder uh(&lock_);
hostsMap_.emplace(hostAddr, std::move(info));
data.emplace_back(MetaServiceUtils::hostKey(hostAddr.first, hostAddr.second),
MetaServiceUtils::hostValOnline());
} else {
it->second.lastHBTimeInSec_ = info.lastHBTimeInSec_;
}
}
if (kvstore_ != nullptr && !data.empty()) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
kvstore_->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data),
[] (kvstore::ResultCode code) {
CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED);
});
}
}

std::vector<HostAddr> ActiveHostsMan::getActiveHosts() {
std::vector<HostAddr> hosts;
folly::RWSpinLock::ReadHolder rh(&lock_);
hosts.resize(hostsMap_.size());
std::transform(hostsMap_.begin(), hostsMap_.end(), hosts.begin(),
[](const auto& entry) -> decltype(auto) {
return entry.first;
});
return hosts;
}

void ActiveHostsMan::loadHostMap() {
if (kvstore_ == nullptr) {
return;
}

const auto& prefix = MetaServiceUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
return;
}

while (iter->valid()) {
auto host = MetaServiceUtils::parseHostKey(iter->key());
HostInfo info;
info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds();
if (iter->val() == MetaServiceUtils::hostValOnline()) {
LOG(INFO) << "load host " << host.ip << ":" << host.port;
updateHostInfo({host.ip, host.port}, info);
}
iter->next();
}
}

void ActiveHostsMan::cleanExpiredHosts() {
int64_t now = time::TimeUtils::nowInSeconds();
std::vector<kvstore::KV> data;
{
folly::RWSpinLock::WriteHolder rh(&lock_);
auto it = hostsMap_.begin();
while (it != hostsMap_.end()) {
if ((now - it->second.lastHBTimeInSec_) > expirationInSeconds_) {
LOG(INFO) << it->first << " expired! last hb time "
<< it->second.lastHBTimeInSec_;
data.emplace_back(MetaServiceUtils::hostKey(it->first.first, it->first.second),
MetaServiceUtils::hostValOffline());
it = hostsMap_.erase(it);
} else {
it++;
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock scope should be finished here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

if (!data.empty() && kvstore_ != nullptr) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
LOG(INFO) << "set " << data.size() << " expired hosts to offline in meta rocksdb";
kvstore_->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data),
[] (kvstore::ResultCode code) {
CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED);
});
}
}

} // namespace meta
} // namespace nebula
85 changes: 23 additions & 62 deletions src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <gtest/gtest_prod.h>
#include "thread/GenericWorker.h"
#include "time/TimeUtils.h"
#include "kvstore/NebulaStore.h"

DECLARE_int32(expired_hosts_check_interval_sec);
DECLARE_int32(expired_threshold_sec);
Expand All @@ -34,95 +35,55 @@ struct HostInfo {
int64_t lastHBTimeInSec_ = 0;
};

class ActiveHostsMan;

class ActiveHostsManHolder final {
public:
ActiveHostsManHolder() = delete;
~ActiveHostsManHolder() = delete;

static ActiveHostsMan* hostsMan() {
static auto hostsMan
= std::make_unique<ActiveHostsMan>(FLAGS_expired_hosts_check_interval_sec,
FLAGS_expired_threshold_sec);
return hostsMan.get();
}
};

class ActiveHostsMan final {
FRIEND_TEST(ActiveHostsManTest, NormalTest);
FRIEND_TEST(ProcessorTest, ListHostsTest);

public:
ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds)
: intervalSeconds_(intervalSeconds)
, expirationInSeconds_(expiredSeconds) {
CHECK_GT(intervalSeconds, 0)
<< "intervalSeconds " << intervalSeconds << " should > 0!";
CHECK_GE(expiredSeconds, intervalSeconds)
<< "expiredSeconds " << expiredSeconds
<< " should >= intervalSeconds " << intervalSeconds;
CHECK(checkThread_.start());
checkThread_.addTimerTask(intervalSeconds * 1000,
intervalSeconds * 1000,
&ActiveHostsMan::cleanExpiredHosts,
this);
static ActiveHostsMan* instance(kvstore::KVStore* kv = nullptr) {
static auto activeHostsMan = std::unique_ptr<ActiveHostsMan>(
new ActiveHostsMan(FLAGS_expired_hosts_check_interval_sec,
FLAGS_expired_threshold_sec, kv));
static std::once_flag initFlag;
std::call_once(initFlag, &ActiveHostsMan::loadHostMap, activeHostsMan.get());
return activeHostsMan.get();
}

~ActiveHostsMan() {
checkThread_.stop();
checkThread_.wait();
}

void updateHostInfo(const HostAddr& hostAddr, const HostInfo& info) {
folly::RWSpinLock::ReadHolder rh(&lock_);
auto it = hostsMap_.find(hostAddr);
if (it == hostsMap_.end()) {
folly::RWSpinLock::UpgradedHolder uh(&lock_);
hostsMap_.emplace(std::move(hostAddr), std::move(info));
} else {
it->second.lastHBTimeInSec_ = info.lastHBTimeInSec_;
}
}
void updateHostInfo(const HostAddr& hostAddr, const HostInfo& info);

std::vector<HostAddr> getActiveHosts() {
std::vector<HostAddr> hosts;
folly::RWSpinLock::ReadHolder rh(&lock_);
hosts.resize(hostsMap_.size());
std::transform(hostsMap_.begin(), hostsMap_.end(), hosts.begin(),
[](const auto& entry) -> decltype(auto) {
return entry.first;
});
return hosts;
}
std::vector<HostAddr> getActiveHosts();

void reset() {
folly::RWSpinLock::WriteHolder rh(&lock_);
hostsMap_.clear();
}

protected:
void cleanExpiredHosts() {
int64_t now = time::TimeUtils::nowInSeconds();
folly::RWSpinLock::WriteHolder rh(&lock_);
auto it = hostsMap_.begin();
while (it != hostsMap_.end()) {
if ((now - it->second.lastHBTimeInSec_) > expirationInSeconds_) {
LOG(INFO) << it->first << " expired! last hb time "
<< it->second.lastHBTimeInSec_;
it = hostsMap_.erase(it);
} else {
it++;
}
}
}
void cleanExpiredHosts();

private:
ActiveHostsMan(int32_t intervalSeconds, int32_t expiredSeconds, kvstore::KVStore* kv = nullptr);

void loadHostMap();

void stopClean() {
checkThread_.stop();
checkThread_.wait();
}

folly::RWSpinLock lock_;
std::unordered_map<HostAddr, HostInfo> hostsMap_;
thread::GenericWorker checkThread_;
int32_t intervalSeconds_ = 0;
int32_t expirationInSeconds_ = 5 * 60;
kvstore::KVStore* kvstore_ = nullptr;
};

} // namespace meta
} // namespace nebula

Expand Down
1 change: 1 addition & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ add_library(
meta_service_handler OBJECT
MetaServiceHandler.cpp
MetaServiceUtils.cpp
ActiveHostsMan.cpp
processors/partsMan/AddHostsProcessor.cpp
processors/partsMan/ListHostsProcessor.cpp
processors/partsMan/RemoveHostsProcessor.cpp
Expand Down
11 changes: 9 additions & 2 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const std::string kTagsTable = "__tags__"; // NOLINT
const std::string kEdgesTable = "__edges__"; // NOLINT
const std::string kIndexTable = "__index__"; // NOLINT

const std::string kHostOnline = "Online"; // NOLINT
const std::string kHostOffline = "Offline"; // NOLINT

std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) {
std::string key;
key.reserve(128);
Expand Down Expand Up @@ -103,8 +106,12 @@ std::string MetaServiceUtils::hostKey(IPv4 ip, Port port) {
return key;
}

std::string MetaServiceUtils::hostVal() {
return "";
std::string MetaServiceUtils::hostValOnline() {
return kHostOnline;
}

std::string MetaServiceUtils::hostValOffline() {
return kHostOffline;
}

const std::string& MetaServiceUtils::hostPrefix() {
Expand Down
4 changes: 3 additions & 1 deletion src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ class MetaServiceUtils final {

static std::string hostKey(IPv4 ip, Port port);

static std::string hostVal();
static std::string hostValOnline();

static std::string hostValOffline();

static const std::string& hostPrefix();

Expand Down
Loading