Skip to content

Commit

Permalink
Support displaying online hosts in "SHOW HOSTS" command [440]
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed May 30, 2019
1 parent fd3c240 commit 74061c8
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 20 deletions.
10 changes: 6 additions & 4 deletions src/graph/ShowExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ void ShowExecutor::showHosts() {

header.push_back("Ip");
header.push_back("Port");
header.push_back("Status");
resp_->set_column_names(std::move(header));

for (auto &host : retShowHosts) {
for (auto &status : retShowHosts) {
std::vector<cpp2::ColumnValue> row;
row.resize(2);
row[0].set_str(NetworkUtils::ipFromHostAddr(host));
row[1].set_str(folly::to<std::string>(NetworkUtils::portFromHostAddr(host)));
row.resize(3);
row[0].set_str(NetworkUtils::ipFromHostAddr(status.first));
row[1].set_str(folly::to<std::string>(NetworkUtils::portFromHostAddr(status.first)));
row[2].set_str(status.second);
rows.emplace_back();
rows.back().set_columns(std::move(row));
}
Expand Down
7 changes: 6 additions & 1 deletion src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ struct EdgeItem {
4: common.Schema schema,
}

struct HostItem {
1: common.HostAddr hostAddr,
2: string status,
}

struct ExecResp {
1: ErrorCode code,
// For custom kv operations, it is useless.
Expand Down Expand Up @@ -220,7 +225,7 @@ struct ListHostsResp {
1: ErrorCode code,
// Valid if ret equals E_LEADER_CHANGED.
2: common.HostAddr leader,
3: list<common.HostAddr> hosts,
3: list<HostItem> hosts,
}

struct RemoveHostsReq {
Expand Down
13 changes: 11 additions & 2 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,15 @@ std::vector<HostAddr> MetaClient::to(const std::vector<nebula::cpp2::HostAddr>&
return hosts;
}

std::vector<HostStatus> MetaClient::toHostStatus(const std::vector<cpp2::HostItem>& tHosts) {
std::vector<HostStatus> hosts;
hosts.resize(tHosts.size());
std::transform(tHosts.begin(), tHosts.end(), hosts.begin(), [](const auto& h) {
return HostStatus(HostAddr(h.hostAddr.get_ip(), h.hostAddr.get_port()), h.get_status());
});
return hosts;
}

std::vector<SpaceIdName> MetaClient::toSpaceIdName(const std::vector<cpp2::IdName>& tIdNames) {
std::vector<SpaceIdName> idNames;
idNames.resize(tIdNames.size());
Expand Down Expand Up @@ -438,12 +447,12 @@ folly::Future<StatusOr<bool>> MetaClient::addHosts(const std::vector<HostAddr>&
}, true);
}

folly::Future<StatusOr<std::vector<HostAddr>>> MetaClient::listHosts() {
folly::Future<StatusOr<std::vector<HostStatus>>> MetaClient::listHosts() {
cpp2::ListHostsReq req;
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_listHosts(request);
}, [this] (cpp2::ListHostsResp&& resp) -> decltype(auto) {
return this->to(resp.hosts);
return this->toHostStatus(resp.hosts);
});
}

Expand Down
5 changes: 4 additions & 1 deletion src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace meta {

using PartsAlloc = std::unordered_map<PartitionID, std::vector<HostAddr>>;
using SpaceIdName = std::pair<GraphSpaceID, std::string>;
using HostStatus = std::pair<HostAddr, std::string>;

// struct for in cache
using TagIDSchemas = std::unordered_map<std::pair<TagID, SchemaVer>,
Expand Down Expand Up @@ -90,7 +91,7 @@ class MetaClient {
folly::Future<StatusOr<bool>>
addHosts(const std::vector<HostAddr>& hosts);

folly::Future<StatusOr<std::vector<HostAddr>>>
folly::Future<StatusOr<std::vector<HostStatus>>>
listHosts();

folly::Future<StatusOr<bool>>
Expand Down Expand Up @@ -228,6 +229,8 @@ class MetaClient {

std::vector<HostAddr> to(const std::vector<nebula::cpp2::HostAddr>& hosts);

std::vector<HostStatus> toHostStatus(const std::vector<cpp2::HostItem>& thosts);

std::vector<SpaceIdName> toSpaceIdName(const std::vector<cpp2::IdName>& tIdNames);

PartsMap doGetPartsMap(const HostAddr& host,
Expand Down
5 changes: 5 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ class BaseProcessor {
* */
StatusOr<std::vector<nebula::cpp2::HostAddr>> allHosts();

/**
* Get all hosts
* */
StatusOr<std::vector<cpp2::HostItem>> allHostsWithStatus();

/**
* Get one auto-increment Id.
* */
Expand Down
39 changes: 39 additions & 0 deletions src/meta/processors/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "meta/MetaServiceUtils.h"
#include "meta/processors/BaseProcessor.h"
#include "meta/ActiveHostsMan.h"

namespace nebula {
namespace meta {
Expand Down Expand Up @@ -126,6 +127,44 @@ StatusOr<std::vector<nebula::cpp2::HostAddr>> BaseProcessor<RESP>::allHosts() {
return hosts;
}

template<typename RESP>
StatusOr<std::vector<cpp2::HostItem>> BaseProcessor<RESP>::allHostsWithStatus() {
std::vector<cpp2::HostItem> hostItems;
const auto& prefix = MetaServiceUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
return Status::Error("Can't find any hosts");
}

auto hosts = ActiveHostsManHolder::hostsMan()->getActiveHosts();
std::vector<nebula::cpp2::HostAddr> activeHosts;
activeHosts.resize(hosts.size());
std::transform(hosts.begin(), hosts.end(), activeHosts.begin(), [](const auto& h) {
nebula::cpp2::HostAddr th;
th.set_ip(h.first);
th.set_port(h.second);
return th;
});

while (iter->valid()) {
cpp2::HostItem item;
nebula::cpp2::HostAddr host;
auto hostAddrPiece = iter->key().subpiece(prefix.size());
memcpy(&host, hostAddrPiece.data(), hostAddrPiece.size());
item.set_hostAddr(host);
if (std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end()) {
item.set_status("online");
} else {
item.set_status("offline");
}
hostItems.emplace_back(item);
iter->next();
}

return hostItems;
}

template<typename RESP>
int32_t BaseProcessor<RESP>::autoIncrementId() {
folly::SharedMutex::WriteHolder holder(LockUtils::idLock());
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/ListHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace meta {
void ListHostsProcessor::process(const cpp2::ListHostsReq& req) {
UNUSED(req);
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
auto ret = allHosts();
auto ret = allHostsWithStatus();
if (!ret.ok()) {
LOG(ERROR) << "List Hosts Failed : No hosts";
resp_.set_code(cpp2::ErrorCode::E_NO_HOSTS);
Expand Down
4 changes: 2 additions & 2 deletions src/meta/test/HBProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ TEST(HBProcessorTest, HBTest) {
auto resp = std::move(f).get();
ASSERT_EQ(10, resp.hosts.size());
for (auto i = 0; i < 10; i++) {
ASSERT_EQ(i, resp.hosts[i].ip);
ASSERT_EQ(i, resp.hosts[i].port);
ASSERT_EQ(i, resp.hosts[i].hostAddr.ip);
ASSERT_EQ(i, resp.hosts[i].hostAddr.port);
}
}
{
Expand Down
12 changes: 9 additions & 3 deletions src/meta/test/MetaClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ TEST(MetaClientTest, InterfacesTest) {
TestUtils::registerHB(hosts);
auto ret = client->listHosts().get();
ASSERT_TRUE(ret.ok());
ASSERT_EQ(hosts, ret.value());
for (auto i = 0u; i < hosts.size(); i++) {
ASSERT_EQ(hosts[i], ret.value()[i].first);
}
}
{
// Test createSpace, listSpaces, getPartsAlloc.
Expand Down Expand Up @@ -388,7 +390,9 @@ TEST(MetaClientTest, DiffTest) {
TestUtils::registerHB(hosts);
auto ret = client->listHosts().get();
ASSERT_TRUE(ret.ok());
ASSERT_EQ(hosts, ret.value());
for (auto i = 0u; i < hosts.size(); i++) {
ASSERT_EQ(hosts[i], ret.value()[i].first);
}
}
{
// Test Create Space and List Spaces
Expand Down Expand Up @@ -437,7 +441,9 @@ TEST(MetaClientTest, HeartbeatTest) {
ASSERT_TRUE(r.ok());
auto ret = client->listHosts().get();
ASSERT_TRUE(ret.ok());
ASSERT_EQ(hosts, ret.value());
for (auto i = 0u; i < hosts.size(); i++) {
ASSERT_EQ(hosts[i], ret.value()[i].first);
}
}
sleep(FLAGS_heartbeat_interval_secs + 1);
ASSERT_EQ(1, ActiveHostsManHolder::hostsMan()->getActiveHosts().size());
Expand Down
8 changes: 4 additions & 4 deletions src/meta/test/ProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ TEST(ProcessorTest, AddHostsTest) {
auto resp = std::move(f).get();
ASSERT_EQ(10, resp.hosts.size());
for (auto i = 0; i < 10; i++) {
ASSERT_EQ(i, resp.hosts[i].ip);
ASSERT_EQ(i, resp.hosts[i].port);
ASSERT_EQ(i, resp.hosts[i].hostAddr.ip);
ASSERT_EQ(i, resp.hosts[i].hostAddr.port);
}
}
{
Expand All @@ -89,8 +89,8 @@ TEST(ProcessorTest, AddHostsTest) {
auto resp = std::move(f).get();
ASSERT_EQ(20, resp.hosts.size());
for (auto i = 0; i < 20; i++) {
ASSERT_EQ(i, resp.hosts[i].ip);
ASSERT_EQ(i, resp.hosts[i].port);
ASSERT_EQ(i, resp.hosts[i].hostAddr.ip);
ASSERT_EQ(i, resp.hosts[i].hostAddr.port);
}
}
{
Expand Down
4 changes: 2 additions & 2 deletions src/meta/test/TestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class TestUtils {
auto resp = std::move(f).get();
EXPECT_EQ(hosts.size(), resp.hosts.size());
for (decltype(hosts.size()) i = 0; i < hosts.size(); i++) {
EXPECT_EQ(hosts[i].first, resp.hosts[i].ip);
EXPECT_EQ(hosts[i].second, resp.hosts[i].port);
EXPECT_EQ(hosts[i].first, resp.hosts[i].hostAddr.ip);
EXPECT_EQ(hosts[i].second, resp.hosts[i].hostAddr.port);
}
}
return hosts.size();
Expand Down

0 comments on commit 74061c8

Please sign in to comment.