Skip to content

Commit

Permalink
Use active hosts when creating space (#425)
Browse files Browse the repository at this point in the history
* Use active hosts when creating space

* x
  • Loading branch information
dangleptr authored May 29, 2019
1 parent 3be03dc commit 86cc984
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 53 deletions.
9 changes: 6 additions & 3 deletions src/graph/test/SchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "base/Base.h"
#include "graph/test/TestEnv.h"
#include "graph/test/TestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);

Expand Down Expand Up @@ -34,6 +35,8 @@ TEST_F(SchemaTest, metaCommunication) {
std::string query = "ADD HOSTS 127.0.0.1:1000, 127.0.0.1:1100";
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
meta::TestUtils::registerHB(
network::NetworkUtils::toHosts("127.0.0.1:1000, 127.0.0.1:1100").value());
}
{
cpp2::ExecutionResponse resp;
Expand All @@ -54,7 +57,7 @@ TEST_F(SchemaTest, metaCommunication) {
}
{
cpp2::ExecutionResponse resp;
std::string query = "CREATE SPACE default_space(partition_num=9, replica_factor=3)";
std::string query = "CREATE SPACE default_space(partition_num=9, replica_factor=1)";
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
Expand All @@ -65,7 +68,7 @@ TEST_F(SchemaTest, metaCommunication) {
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<int, std::string, int, int>> expected{
{1, "default_space", 9, 3},
{1, "default_space", 9, 1},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
Expand Down Expand Up @@ -235,7 +238,7 @@ TEST_F(SchemaTest, metaCommunication) {
}
{
cpp2::ExecutionResponse resp;
std::string query = "CREATE SPACE my_space(partition_num=9, replica_factor=3)";
std::string query = "CREATE SPACE my_space(partition_num=9, replica_factor=1)";
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
Expand Down
1 change: 1 addition & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ enum ErrorCode {
E_EXISTED = -22,
E_NOT_FOUND = -23,
E_INVALID_HOST = -24,
E_UNSUPPORTED = -25,

// KV Failure
E_STORE_FAILURE = -31,
Expand Down
23 changes: 23 additions & 0 deletions src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include "thread/GenericWorker.h"
#include "time/TimeUtils.h"

DECLARE_int32(expired_hosts_check_interval_sec);
DECLARE_int32(expired_threshold_sec);

namespace nebula {
namespace meta {

Expand All @@ -31,6 +34,21 @@ struct HostInfo {
int64_t lastHBTimeInSec_ = 0;
};

class ActiveHostsMan;

class ActiveHostsManHolder final {
public:
ActiveHostsManHolder() = delete;
~ActiveHostsManHolder() = delete;

static ActiveHostsMan* hostsMan() {
static auto hostsMan
= std::make_unique<ActiveHostsMan>(FLAGS_expired_hosts_check_interval_sec,
FLAGS_expired_threshold_sec);
return hostsMan.get();
}
};

class ActiveHostsMan final {
FRIEND_TEST(ActiveHostsManTest, NormalTest);

Expand Down Expand Up @@ -77,6 +95,11 @@ class ActiveHostsMan final {
return hosts;
}

void reset() {
folly::RWSpinLock::WriteHolder rh(&lock_);
hostsMap_.clear();
}

protected:
void cleanExpiredHosts() {
int64_t now = time::TimeUtils::nowInSeconds();
Expand Down
1 change: 0 additions & 1 deletion src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,5 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, true);
}

} // namespace meta
} // namespace nebula
7 changes: 7 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ class BaseProcessor {
return thriftID;
}

nebula::cpp2::HostAddr toThriftHost(const HostAddr& host) {
nebula::cpp2::HostAddr tHost;
tHost.set_ip(host.first);
tHost.set_port(host.second);
return tHost;
}

/**
* General put function.
* */
Expand Down
18 changes: 12 additions & 6 deletions src/meta/processors/CreateSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include "meta/processors/CreateSpaceProcessor.h"
#include "meta/ActiveHostsMan.h"

namespace nebula {
namespace meta {
Expand All @@ -22,20 +23,25 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
return;
}
CHECK_EQ(Status::SpaceNotFound(), spaceRet.status());
auto ret = allHosts();
if (!ret.ok()) {
auto hosts = ActiveHostsManHolder::hostsMan()->getActiveHosts();
if (hosts.empty()) {
LOG(ERROR) << "Create Space Failed : No Hosts!";
resp_.set_code(cpp2::ErrorCode::E_NO_HOSTS);
onFinished();
return;
}
auto spaceId = autoIncrementId();
auto hosts = ret.value();
auto spaceName = properties.get_space_name();
auto partitionNum = properties.get_partition_num();
auto replicaFactor = properties.get_replica_factor();
VLOG(3) << "Create space " << spaceName << ", id " << spaceId;

if ((int32_t)hosts.size() < replicaFactor) {
LOG(ERROR) << "Not enough hosts existed for replica "
<< replicaFactor << ", hosts num " << hosts.size();
resp_.set_code(cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
std::vector<kvstore::KV> data;
data.emplace_back(MetaServiceUtils::indexSpaceKey(spaceName),
std::string(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId)));
Expand All @@ -53,15 +59,15 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {

std::vector<nebula::cpp2::HostAddr>
CreateSpaceProcessor::pickHosts(PartitionID partId,
const std::vector<nebula::cpp2::HostAddr>& hosts,
const std::vector<HostAddr>& hosts,
int32_t replicaFactor) {
if (hosts.size() == 0) {
return std::vector<nebula::cpp2::HostAddr>();
}
auto startIndex = partId;
std::vector<nebula::cpp2::HostAddr> pickedHosts;
for (decltype(replicaFactor) i = 0; i < replicaFactor; i++) {
pickedHosts.emplace_back(hosts[startIndex++ % hosts.size()]);
pickedHosts.emplace_back(toThriftHost(hosts[startIndex++ % hosts.size()]));
}
return pickedHosts;
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/CreateSpaceProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class CreateSpaceProcessor : public BaseProcessor<cpp2::ExecResp> {
protected:
std::vector<nebula::cpp2::HostAddr> pickHosts(
PartitionID partId,
const std::vector<nebula::cpp2::HostAddr>& hosts,
const std::vector<HostAddr>& hosts,
int32_t replicaFactor);

private:
Expand Down
18 changes: 18 additions & 0 deletions src/meta/processors/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,28 @@ DEFINE_int32(expired_hosts_check_interval_sec, 20,
"Check the expired hosts at the interval");
DEFINE_int32(expired_threshold_sec, 10 * 60,
"Hosts will be expired in this time if no heartbeat received");
DEFINE_bool(hosts_whitelist_enabled, true, "Check host whether in whitelist when received hb");

namespace nebula {
namespace meta {

void HBProcessor::process(const cpp2::HBReq& req) {
HostAddr host(req.host.ip, req.host.port);
if (FLAGS_hosts_whitelist_enabled
&& hostExist(MetaServiceUtils::hostKey(host.first, host.second))
== Status::HostNotFound()) {
LOG(INFO) << "Reject unregistered host " << host << "!";
resp_.set_code(cpp2::ErrorCode::E_INVALID_HOST);
onFinished();
return;
}

LOG(INFO) << "Receive heartbeat from " << host;
HostInfo info;
info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds();
ActiveHostsManHolder::hostsMan()->updateHostInfo(host, info);
onFinished();
}

} // namespace meta
} // namespace nebula
Expand Down
26 changes: 1 addition & 25 deletions src/meta/processors/HBProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
#include "meta/ActiveHostsMan.h"
#include "time/TimeUtils.h"

DECLARE_int32(expired_hosts_check_interval_sec);
DECLARE_int32(expired_threshold_sec);

namespace nebula {
namespace meta {
Expand All @@ -27,33 +25,11 @@ class HBProcessor : public BaseProcessor<cpp2::HBResp> {
return new HBProcessor(kvstore);
}

void process(const cpp2::HBReq& req) {
HostAddr host(req.host.ip, req.host.port);
if (hostExist(MetaServiceUtils::hostKey(host.first, host.second))
== Status::HostNotFound()) {
LOG(INFO) << "Reject unregistered host " << host << "!";
resp_.set_code(cpp2::ErrorCode::E_INVALID_HOST);
onFinished();
return;
}

LOG(INFO) << "Receive heartbeat from " << host;
HostInfo info;
info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds();
hostsMan()->updateHostInfo(host, info);
onFinished();
}
void process(const cpp2::HBReq& req);

private:
explicit HBProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::HBResp>(kvstore) {}

static ActiveHostsMan* hostsMan() {
static auto hostsMan
= std::make_unique<ActiveHostsMan>(FLAGS_expired_hosts_check_interval_sec,
FLAGS_expired_threshold_sec);
return hostsMan.get();
}
};

} // namespace meta
Expand Down
4 changes: 2 additions & 2 deletions src/meta/test/HBProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ TEST(HBProcessorTest, HBTest) {
auto resp = std::move(f).get();
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code);
}
auto hosts = HBProcessor::hostsMan()->getActiveHosts();
auto hosts = ActiveHostsManHolder::hostsMan()->getActiveHosts();
ASSERT_EQ(5, hosts.size());
sleep(3);
ASSERT_EQ(0, HBProcessor::hostsMan()->getActiveHosts().size());
ASSERT_EQ(0, ActiveHostsManHolder::hostsMan()->getActiveHosts().size());

LOG(INFO) << "Test for invalid host!";
cpp2::HBReq req;
Expand Down
19 changes: 9 additions & 10 deletions src/meta/test/MetaClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "meta/ServerBasedSchemaManager.h"
#include "dataman/ResultSchemaProvider.h"
#include "meta/processors/HBProcessor.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);
DECLARE_int32(heartbeat_interval_secs);
Expand Down Expand Up @@ -47,14 +48,15 @@ TEST(MetaClientTest, InterfacesTest) {
std::vector<HostAddr> hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}};
auto r = client->addHosts(hosts).get();
ASSERT_TRUE(r.ok());
TestUtils::registerHB(hosts);
auto ret = client->listHosts().get();
ASSERT_TRUE(ret.ok());
ASSERT_EQ(hosts, ret.value());
}
{
// Test createSpace, listSpaces, getPartsAlloc.
{
auto ret = client->createSpace("default_space", 9, 3).get();
auto ret = client->createSpace("default_space", 8, 3).get();
ASSERT_TRUE(ret.ok()) << ret.status();
spaceId = ret.value();
}
Expand All @@ -68,12 +70,9 @@ TEST(MetaClientTest, InterfacesTest) {
{
auto ret = client->getPartsAlloc(spaceId).get();
ASSERT_TRUE(ret.ok()) << ret.status();
ASSERT_EQ(8, ret.value().size());
for (auto it = ret.value().begin(); it != ret.value().end(); it++) {
auto startIndex = it->first;
for (auto& h : it->second) {
ASSERT_EQ(startIndex++ % 4, h.first);
ASSERT_EQ(h.first, h.second);
}
ASSERT_EQ(3, it->second.size());
}
}
{
Expand Down Expand Up @@ -165,16 +164,14 @@ TEST(MetaClientTest, InterfacesTest) {
sleep(FLAGS_load_data_interval_secs + 1);
{
// Test cache interfaces
// For Host(0, 0) the parts should be 2, 3, 4, 6, 7, 8
auto partsMap = client->getPartsMapFromCache(HostAddr(0, 0));
ASSERT_EQ(1, partsMap.size());
ASSERT_EQ(6, partsMap[spaceId].size());
}
{
auto partMeta = client->getPartMetaFromCache(spaceId, 1);
int32_t startIndex = 1;
ASSERT_EQ(3, partMeta.peers_.size());
for (auto& h : partMeta.peers_) {
ASSERT_EQ(startIndex++ % 4, h.first);
ASSERT_EQ(h.first, h.second);
}
}
Expand Down Expand Up @@ -287,6 +284,7 @@ TEST(MetaClientTest, TagTest) {
std::vector<HostAddr> hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}};
auto r = client->addHosts(hosts).get();
ASSERT_TRUE(r.ok());
TestUtils::registerHB(hosts);
auto ret = client->createSpace("default_space", 9, 3).get();
ASSERT_TRUE(ret.ok()) << ret.status();
spaceId = ret.value();
Expand Down Expand Up @@ -387,6 +385,7 @@ TEST(MetaClientTest, DiffTest) {
std::vector<HostAddr> hosts = {{0, 0}};
auto r = client->addHosts(hosts).get();
ASSERT_TRUE(r.ok());
TestUtils::registerHB(hosts);
auto ret = client->listHosts().get();
ASSERT_TRUE(ret.ok());
ASSERT_EQ(hosts, ret.value());
Expand Down Expand Up @@ -441,7 +440,7 @@ TEST(MetaClientTest, HeartbeatTest) {
ASSERT_EQ(hosts, ret.value());
}
sleep(FLAGS_heartbeat_interval_secs + 1);
ASSERT_EQ(1, HBProcessor::hostsMan()->getActiveHosts().size());
ASSERT_EQ(1, ActiveHostsManHolder::hostsMan()->getActiveHosts().size());
}

} // namespace meta
Expand Down
12 changes: 7 additions & 5 deletions src/meta/test/ProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,10 @@ TEST(ProcessorTest, CreateSpaceTest) {
{
cpp2::SpaceProperties properties;
properties.set_space_name("default_space");
properties.set_partition_num(9);
properties.set_partition_num(8);
properties.set_replica_factor(3);
cpp2::CreateSpaceReq req;
req.set_properties(std::move(properties));

auto* processor = CreateSpaceProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
Expand Down Expand Up @@ -155,18 +154,21 @@ TEST(ProcessorTest, CreateSpaceTest) {
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code);
std::unordered_map<HostAddr, std::set<PartitionID>> hostsParts;
for (auto& p : resp.get_parts()) {
auto startIndex = p.first;
for (auto& h : p.second) {
ASSERT_EQ(startIndex++ % hostsNum, h.get_ip());
hostsParts[std::make_pair(h.get_ip(), h.get_port())].insert(p.first);
ASSERT_EQ(h.get_ip(), h.get_port());
}
}
ASSERT_EQ(hostsNum, hostsParts.size());
for (auto it = hostsParts.begin(); it != hostsParts.end(); it++) {
ASSERT_EQ(6, it->second.size());
}
}
{
cpp2::DropSpaceReq req;
req.set_space_name("default_space");

auto* processor = DropSpaceProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
Expand Down
Loading

0 comments on commit 86cc984

Please sign in to comment.