Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use active hosts when creating space #425

Merged
merged 3 commits into from
May 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/graph/test/SchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "base/Base.h"
#include "graph/test/TestEnv.h"
#include "graph/test/TestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);

Expand Down Expand Up @@ -34,6 +35,8 @@ TEST_F(SchemaTest, metaCommunication) {
std::string query = "ADD HOSTS 127.0.0.1:1000, 127.0.0.1:1100";
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
meta::TestUtils::registerHB(
network::NetworkUtils::toHosts("127.0.0.1:1000, 127.0.0.1:1100").value());
}
{
cpp2::ExecutionResponse resp;
Expand All @@ -54,7 +57,7 @@ TEST_F(SchemaTest, metaCommunication) {
}
{
cpp2::ExecutionResponse resp;
std::string query = "CREATE SPACE default_space(partition_num=9, replica_factor=3)";
std::string query = "CREATE SPACE default_space(partition_num=9, replica_factor=1)";
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
Expand All @@ -65,7 +68,7 @@ TEST_F(SchemaTest, metaCommunication) {
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<int, std::string, int, int>> expected{
{1, "default_space", 9, 3},
{1, "default_space", 9, 1},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
Expand Down Expand Up @@ -235,7 +238,7 @@ TEST_F(SchemaTest, metaCommunication) {
}
{
cpp2::ExecutionResponse resp;
std::string query = "CREATE SPACE my_space(partition_num=9, replica_factor=3)";
std::string query = "CREATE SPACE my_space(partition_num=9, replica_factor=1)";
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
Expand Down
1 change: 1 addition & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ enum ErrorCode {
E_EXISTED = -22,
E_NOT_FOUND = -23,
E_INVALID_HOST = -24,
E_UNSUPPORTED = -25,

// KV Failure
E_STORE_FAILURE = -31,
Expand Down
23 changes: 23 additions & 0 deletions src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include "thread/GenericWorker.h"
#include "time/TimeUtils.h"

DECLARE_int32(expired_hosts_check_interval_sec);
DECLARE_int32(expired_threshold_sec);

namespace nebula {
namespace meta {

Expand All @@ -31,6 +34,21 @@ struct HostInfo {
int64_t lastHBTimeInSec_ = 0;
};

class ActiveHostsMan;

class ActiveHostsManHolder final {
public:
ActiveHostsManHolder() = delete;
~ActiveHostsManHolder() = delete;

static ActiveHostsMan* hostsMan() {
static auto hostsMan
= std::make_unique<ActiveHostsMan>(FLAGS_expired_hosts_check_interval_sec,
FLAGS_expired_threshold_sec);
return hostsMan.get();
}
};

class ActiveHostsMan final {
FRIEND_TEST(ActiveHostsManTest, NormalTest);

Expand Down Expand Up @@ -77,6 +95,11 @@ class ActiveHostsMan final {
return hosts;
}

void reset() {
folly::RWSpinLock::WriteHolder rh(&lock_);
hostsMap_.clear();
}

protected:
void cleanExpiredHosts() {
int64_t now = time::TimeUtils::nowInSeconds();
Expand Down
1 change: 0 additions & 1 deletion src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,5 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, true);
}

} // namespace meta
} // namespace nebula
7 changes: 7 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ class BaseProcessor {
return thriftID;
}

nebula::cpp2::HostAddr toThriftHost(const HostAddr& host) {
nebula::cpp2::HostAddr tHost;
tHost.set_ip(host.first);
tHost.set_port(host.second);
return tHost;
}

/**
* General put function.
* */
Expand Down
18 changes: 12 additions & 6 deletions src/meta/processors/CreateSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include "meta/processors/CreateSpaceProcessor.h"
#include "meta/ActiveHostsMan.h"

namespace nebula {
namespace meta {
Expand All @@ -22,20 +23,25 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
return;
}
CHECK_EQ(Status::SpaceNotFound(), spaceRet.status());
auto ret = allHosts();
if (!ret.ok()) {
auto hosts = ActiveHostsManHolder::hostsMan()->getActiveHosts();
if (hosts.empty()) {
LOG(ERROR) << "Create Space Failed : No Hosts!";
resp_.set_code(cpp2::ErrorCode::E_NO_HOSTS);
onFinished();
return;
}
auto spaceId = autoIncrementId();
auto hosts = ret.value();
auto spaceName = properties.get_space_name();
auto partitionNum = properties.get_partition_num();
auto replicaFactor = properties.get_replica_factor();
VLOG(3) << "Create space " << spaceName << ", id " << spaceId;

if ((int32_t)hosts.size() < replicaFactor) {
LOG(ERROR) << "Not enough hosts existed for replica "
<< replicaFactor << ", hosts num " << hosts.size();
resp_.set_code(cpp2::ErrorCode::E_UNSUPPORTED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsupport is the meaning that this operation does not implement.

Illegal_Argument will be better ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thrift will return its own failure if related method does not implement. So i think UNSUPPORTED is ok in this case.

onFinished();
return;
}
std::vector<kvstore::KV> data;
data.emplace_back(MetaServiceUtils::indexSpaceKey(spaceName),
std::string(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId)));
Expand All @@ -53,15 +59,15 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {

std::vector<nebula::cpp2::HostAddr>
CreateSpaceProcessor::pickHosts(PartitionID partId,
const std::vector<nebula::cpp2::HostAddr>& hosts,
const std::vector<HostAddr>& hosts,
int32_t replicaFactor) {
if (hosts.size() == 0) {
return std::vector<nebula::cpp2::HostAddr>();
}
auto startIndex = partId;
std::vector<nebula::cpp2::HostAddr> pickedHosts;
for (decltype(replicaFactor) i = 0; i < replicaFactor; i++) {
pickedHosts.emplace_back(hosts[startIndex++ % hosts.size()]);
pickedHosts.emplace_back(toThriftHost(hosts[startIndex++ % hosts.size()]));
}
return pickedHosts;
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/CreateSpaceProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class CreateSpaceProcessor : public BaseProcessor<cpp2::ExecResp> {
protected:
std::vector<nebula::cpp2::HostAddr> pickHosts(
PartitionID partId,
const std::vector<nebula::cpp2::HostAddr>& hosts,
const std::vector<HostAddr>& hosts,
int32_t replicaFactor);

private:
Expand Down
18 changes: 18 additions & 0 deletions src/meta/processors/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,28 @@ DEFINE_int32(expired_hosts_check_interval_sec, 20,
"Check the expired hosts at the interval");
DEFINE_int32(expired_threshold_sec, 10 * 60,
"Hosts will be expired in this time if no heartbeat received");
DEFINE_bool(hosts_whitelist_enabled, true, "Check host whether in whitelist when received hb");

namespace nebula {
namespace meta {

void HBProcessor::process(const cpp2::HBReq& req) {
HostAddr host(req.host.ip, req.host.port);
if (FLAGS_hosts_whitelist_enabled
&& hostExist(MetaServiceUtils::hostKey(host.first, host.second))
== Status::HostNotFound()) {
LOG(INFO) << "Reject unregistered host " << host << "!";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG(ERROR)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it is not an error.

resp_.set_code(cpp2::ErrorCode::E_INVALID_HOST);
onFinished();
return;
}

LOG(INFO) << "Receive heartbeat from " << host;
HostInfo info;
info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds();
ActiveHostsManHolder::hostsMan()->updateHostInfo(host, info);
onFinished();
}

} // namespace meta
} // namespace nebula
Expand Down
26 changes: 1 addition & 25 deletions src/meta/processors/HBProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
#include "meta/ActiveHostsMan.h"
#include "time/TimeUtils.h"

DECLARE_int32(expired_hosts_check_interval_sec);
DECLARE_int32(expired_threshold_sec);

namespace nebula {
namespace meta {
Expand All @@ -27,33 +25,11 @@ class HBProcessor : public BaseProcessor<cpp2::HBResp> {
return new HBProcessor(kvstore);
}

void process(const cpp2::HBReq& req) {
HostAddr host(req.host.ip, req.host.port);
if (hostExist(MetaServiceUtils::hostKey(host.first, host.second))
== Status::HostNotFound()) {
LOG(INFO) << "Reject unregistered host " << host << "!";
resp_.set_code(cpp2::ErrorCode::E_INVALID_HOST);
onFinished();
return;
}

LOG(INFO) << "Receive heartbeat from " << host;
HostInfo info;
info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds();
hostsMan()->updateHostInfo(host, info);
onFinished();
}
void process(const cpp2::HBReq& req);

private:
explicit HBProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::HBResp>(kvstore) {}

static ActiveHostsMan* hostsMan() {
static auto hostsMan
= std::make_unique<ActiveHostsMan>(FLAGS_expired_hosts_check_interval_sec,
FLAGS_expired_threshold_sec);
return hostsMan.get();
}
};

} // namespace meta
Expand Down
4 changes: 2 additions & 2 deletions src/meta/test/HBProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ TEST(HBProcessorTest, HBTest) {
auto resp = std::move(f).get();
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code);
}
auto hosts = HBProcessor::hostsMan()->getActiveHosts();
auto hosts = ActiveHostsManHolder::hostsMan()->getActiveHosts();
ASSERT_EQ(5, hosts.size());
sleep(3);
ASSERT_EQ(0, HBProcessor::hostsMan()->getActiveHosts().size());
ASSERT_EQ(0, ActiveHostsManHolder::hostsMan()->getActiveHosts().size());

LOG(INFO) << "Test for invalid host!";
cpp2::HBReq req;
Expand Down
19 changes: 9 additions & 10 deletions src/meta/test/MetaClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "meta/ServerBasedSchemaManager.h"
#include "dataman/ResultSchemaProvider.h"
#include "meta/processors/HBProcessor.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);
DECLARE_int32(heartbeat_interval_secs);
Expand Down Expand Up @@ -47,14 +48,15 @@ TEST(MetaClientTest, InterfacesTest) {
std::vector<HostAddr> hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}};
auto r = client->addHosts(hosts).get();
ASSERT_TRUE(r.ok());
TestUtils::registerHB(hosts);
auto ret = client->listHosts().get();
ASSERT_TRUE(ret.ok());
ASSERT_EQ(hosts, ret.value());
}
{
// Test createSpace, listSpaces, getPartsAlloc.
{
auto ret = client->createSpace("default_space", 9, 3).get();
auto ret = client->createSpace("default_space", 8, 3).get();
ASSERT_TRUE(ret.ok()) << ret.status();
spaceId = ret.value();
}
Expand All @@ -68,12 +70,9 @@ TEST(MetaClientTest, InterfacesTest) {
{
auto ret = client->getPartsAlloc(spaceId).get();
ASSERT_TRUE(ret.ok()) << ret.status();
ASSERT_EQ(8, ret.value().size());
for (auto it = ret.value().begin(); it != ret.value().end(); it++) {
auto startIndex = it->first;
for (auto& h : it->second) {
ASSERT_EQ(startIndex++ % 4, h.first);
ASSERT_EQ(h.first, h.second);
}
ASSERT_EQ(3, it->second.size());
}
}
{
Expand Down Expand Up @@ -165,16 +164,14 @@ TEST(MetaClientTest, InterfacesTest) {
sleep(FLAGS_load_data_interval_secs + 1);
{
// Test cache interfaces
// For Host(0, 0) the parts should be 2, 3, 4, 6, 7, 8
auto partsMap = client->getPartsMapFromCache(HostAddr(0, 0));
ASSERT_EQ(1, partsMap.size());
ASSERT_EQ(6, partsMap[spaceId].size());
}
{
auto partMeta = client->getPartMetaFromCache(spaceId, 1);
int32_t startIndex = 1;
ASSERT_EQ(3, partMeta.peers_.size());
for (auto& h : partMeta.peers_) {
ASSERT_EQ(startIndex++ % 4, h.first);
ASSERT_EQ(h.first, h.second);
}
}
Expand Down Expand Up @@ -287,6 +284,7 @@ TEST(MetaClientTest, TagTest) {
std::vector<HostAddr> hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}};
auto r = client->addHosts(hosts).get();
ASSERT_TRUE(r.ok());
TestUtils::registerHB(hosts);
auto ret = client->createSpace("default_space", 9, 3).get();
ASSERT_TRUE(ret.ok()) << ret.status();
spaceId = ret.value();
Expand Down Expand Up @@ -387,6 +385,7 @@ TEST(MetaClientTest, DiffTest) {
std::vector<HostAddr> hosts = {{0, 0}};
auto r = client->addHosts(hosts).get();
ASSERT_TRUE(r.ok());
TestUtils::registerHB(hosts);
auto ret = client->listHosts().get();
ASSERT_TRUE(ret.ok());
ASSERT_EQ(hosts, ret.value());
Expand Down Expand Up @@ -441,7 +440,7 @@ TEST(MetaClientTest, HeartbeatTest) {
ASSERT_EQ(hosts, ret.value());
}
sleep(FLAGS_heartbeat_interval_secs + 1);
ASSERT_EQ(1, HBProcessor::hostsMan()->getActiveHosts().size());
ASSERT_EQ(1, ActiveHostsManHolder::hostsMan()->getActiveHosts().size());
}

} // namespace meta
Expand Down
12 changes: 7 additions & 5 deletions src/meta/test/ProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,10 @@ TEST(ProcessorTest, CreateSpaceTest) {
{
cpp2::SpaceProperties properties;
properties.set_space_name("default_space");
properties.set_partition_num(9);
properties.set_partition_num(8);
properties.set_replica_factor(3);
cpp2::CreateSpaceReq req;
req.set_properties(std::move(properties));

auto* processor = CreateSpaceProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
Expand Down Expand Up @@ -155,18 +154,21 @@ TEST(ProcessorTest, CreateSpaceTest) {
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code);
std::unordered_map<HostAddr, std::set<PartitionID>> hostsParts;
for (auto& p : resp.get_parts()) {
auto startIndex = p.first;
for (auto& h : p.second) {
ASSERT_EQ(startIndex++ % hostsNum, h.get_ip());
hostsParts[std::make_pair(h.get_ip(), h.get_port())].insert(p.first);
ASSERT_EQ(h.get_ip(), h.get_port());
}
}
ASSERT_EQ(hostsNum, hostsParts.size());
for (auto it = hostsParts.begin(); it != hostsParts.end(); it++) {
ASSERT_EQ(6, it->second.size());
}
}
{
cpp2::DropSpaceReq req;
req.set_space_name("default_space");

auto* processor = DropSpaceProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
Expand Down
Loading