diff --git a/src/graph/test/SchemaTest.cpp b/src/graph/test/SchemaTest.cpp index c27b4ff1302..ec3293f07ee 100644 --- a/src/graph/test/SchemaTest.cpp +++ b/src/graph/test/SchemaTest.cpp @@ -7,6 +7,7 @@ #include "base/Base.h" #include "graph/test/TestEnv.h" #include "graph/test/TestBase.h" +#include "meta/test/TestUtils.h" DECLARE_int32(load_data_interval_secs); @@ -34,6 +35,8 @@ TEST_F(SchemaTest, metaCommunication) { std::string query = "ADD HOSTS 127.0.0.1:1000, 127.0.0.1:1100"; auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + meta::TestUtils::registerHB( + network::NetworkUtils::toHosts("127.0.0.1:1000, 127.0.0.1:1100").value()); } { cpp2::ExecutionResponse resp; @@ -54,7 +57,7 @@ TEST_F(SchemaTest, metaCommunication) { } { cpp2::ExecutionResponse resp; - std::string query = "CREATE SPACE default_space(partition_num=9, replica_factor=3)"; + std::string query = "CREATE SPACE default_space(partition_num=9, replica_factor=1)"; auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } @@ -65,7 +68,7 @@ TEST_F(SchemaTest, metaCommunication) { auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); std::vector> expected{ - {1, "default_space", 9, 3}, + {1, "default_space", 9, 1}, }; ASSERT_TRUE(verifyResult(resp, expected)); } @@ -235,7 +238,7 @@ TEST_F(SchemaTest, metaCommunication) { } { cpp2::ExecutionResponse resp; - std::string query = "CREATE SPACE my_space(partition_num=9, replica_factor=3)"; + std::string query = "CREATE SPACE my_space(partition_num=9, replica_factor=1)"; auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 6c667fa4d08..4ec08c21af7 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -25,6 +25,7 @@ enum ErrorCode { E_EXISTED = -22, E_NOT_FOUND = -23, E_INVALID_HOST = -24, + E_UNSUPPORTED = -25, // KV Failure E_STORE_FAILURE = -31, diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index f2289333ed5..030c6173c77 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -12,6 +12,9 @@ #include "thread/GenericWorker.h" #include "time/TimeUtils.h" +DECLARE_int32(expired_hosts_check_interval_sec); +DECLARE_int32(expired_threshold_sec); + namespace nebula { namespace meta { @@ -31,6 +34,21 @@ struct HostInfo { int64_t lastHBTimeInSec_ = 0; }; +class ActiveHostsMan; + +class ActiveHostsManHolder final { +public: + ActiveHostsManHolder() = delete; + ~ActiveHostsManHolder() = delete; + + static ActiveHostsMan* hostsMan() { + static auto hostsMan + = std::make_unique(FLAGS_expired_hosts_check_interval_sec, + FLAGS_expired_threshold_sec); + return hostsMan.get(); + } +}; + class ActiveHostsMan final { FRIEND_TEST(ActiveHostsManTest, NormalTest); @@ -77,6 +95,11 @@ class ActiveHostsMan final { return hosts; } + void reset() { + folly::RWSpinLock::WriteHolder rh(&lock_); + hostsMap_.clear(); + } + protected: void cleanExpiredHosts() { int64_t now = time::TimeUtils::nowInSeconds(); diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 742832dbdc3..6f668eaf585 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -871,6 +871,5 @@ folly::Future> MetaClient::heartbeat() { return resp.code == cpp2::ErrorCode::SUCCEEDED; }, true); } - } // namespace meta } // namespace nebula diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index aac631c6971..e54383ac21e 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -120,6 +120,13 @@ class BaseProcessor { return thriftID; } + nebula::cpp2::HostAddr toThriftHost(const HostAddr& host) { + nebula::cpp2::HostAddr tHost; + tHost.set_ip(host.first); + tHost.set_port(host.second); + return tHost; + } + /** * General put function. * */ diff --git a/src/meta/processors/CreateSpaceProcessor.cpp b/src/meta/processors/CreateSpaceProcessor.cpp index 744424f73fd..c1629f45c10 100644 --- a/src/meta/processors/CreateSpaceProcessor.cpp +++ b/src/meta/processors/CreateSpaceProcessor.cpp @@ -5,6 +5,7 @@ */ #include "meta/processors/CreateSpaceProcessor.h" +#include "meta/ActiveHostsMan.h" namespace nebula { namespace meta { @@ -22,20 +23,25 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { return; } CHECK_EQ(Status::SpaceNotFound(), spaceRet.status()); - auto ret = allHosts(); - if (!ret.ok()) { + auto hosts = ActiveHostsManHolder::hostsMan()->getActiveHosts(); + if (hosts.empty()) { LOG(ERROR) << "Create Space Failed : No Hosts!"; resp_.set_code(cpp2::ErrorCode::E_NO_HOSTS); onFinished(); return; } auto spaceId = autoIncrementId(); - auto hosts = ret.value(); auto spaceName = properties.get_space_name(); auto partitionNum = properties.get_partition_num(); auto replicaFactor = properties.get_replica_factor(); VLOG(3) << "Create space " << spaceName << ", id " << spaceId; - + if ((int32_t)hosts.size() < replicaFactor) { + LOG(ERROR) << "Not enough hosts existed for replica " + << replicaFactor << ", hosts num " << hosts.size(); + resp_.set_code(cpp2::ErrorCode::E_UNSUPPORTED); + onFinished(); + return; + } std::vector data; data.emplace_back(MetaServiceUtils::indexSpaceKey(spaceName), std::string(reinterpret_cast(&spaceId), sizeof(spaceId))); @@ -53,7 +59,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { std::vector CreateSpaceProcessor::pickHosts(PartitionID partId, - const std::vector& hosts, + const std::vector& hosts, int32_t replicaFactor) { if (hosts.size() == 0) { return std::vector(); @@ -61,7 +67,7 @@ CreateSpaceProcessor::pickHosts(PartitionID partId, auto startIndex = partId; std::vector pickedHosts; for (decltype(replicaFactor) i = 0; i < replicaFactor; i++) { - pickedHosts.emplace_back(hosts[startIndex++ % hosts.size()]); + pickedHosts.emplace_back(toThriftHost(hosts[startIndex++ % hosts.size()])); } return pickedHosts; } diff --git a/src/meta/processors/CreateSpaceProcessor.h b/src/meta/processors/CreateSpaceProcessor.h index 4c0d0e03f17..53345b78d6a 100644 --- a/src/meta/processors/CreateSpaceProcessor.h +++ b/src/meta/processors/CreateSpaceProcessor.h @@ -23,7 +23,7 @@ class CreateSpaceProcessor : public BaseProcessor { protected: std::vector pickHosts( PartitionID partId, - const std::vector& hosts, + const std::vector& hosts, int32_t replicaFactor); private: diff --git a/src/meta/processors/HBProcessor.cpp b/src/meta/processors/HBProcessor.cpp index d0456c96bcf..cde34e72216 100644 --- a/src/meta/processors/HBProcessor.cpp +++ b/src/meta/processors/HBProcessor.cpp @@ -11,10 +11,28 @@ DEFINE_int32(expired_hosts_check_interval_sec, 20, "Check the expired hosts at the interval"); DEFINE_int32(expired_threshold_sec, 10 * 60, "Hosts will be expired in this time if no heartbeat received"); +DEFINE_bool(hosts_whitelist_enabled, true, "Check host whether in whitelist when received hb"); namespace nebula { namespace meta { +void HBProcessor::process(const cpp2::HBReq& req) { + HostAddr host(req.host.ip, req.host.port); + if (FLAGS_hosts_whitelist_enabled + && hostExist(MetaServiceUtils::hostKey(host.first, host.second)) + == Status::HostNotFound()) { + LOG(INFO) << "Reject unregistered host " << host << "!"; + resp_.set_code(cpp2::ErrorCode::E_INVALID_HOST); + onFinished(); + return; + } + + LOG(INFO) << "Receive heartbeat from " << host; + HostInfo info; + info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds(); + ActiveHostsManHolder::hostsMan()->updateHostInfo(host, info); + onFinished(); +} } // namespace meta } // namespace nebula diff --git a/src/meta/processors/HBProcessor.h b/src/meta/processors/HBProcessor.h index ab31834ad48..a1206bada38 100644 --- a/src/meta/processors/HBProcessor.h +++ b/src/meta/processors/HBProcessor.h @@ -12,8 +12,6 @@ #include "meta/ActiveHostsMan.h" #include "time/TimeUtils.h" -DECLARE_int32(expired_hosts_check_interval_sec); -DECLARE_int32(expired_threshold_sec); namespace nebula { namespace meta { @@ -27,33 +25,11 @@ class HBProcessor : public BaseProcessor { return new HBProcessor(kvstore); } - void process(const cpp2::HBReq& req) { - HostAddr host(req.host.ip, req.host.port); - if (hostExist(MetaServiceUtils::hostKey(host.first, host.second)) - == Status::HostNotFound()) { - LOG(INFO) << "Reject unregistered host " << host << "!"; - resp_.set_code(cpp2::ErrorCode::E_INVALID_HOST); - onFinished(); - return; - } - - LOG(INFO) << "Receive heartbeat from " << host; - HostInfo info; - info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds(); - hostsMan()->updateHostInfo(host, info); - onFinished(); - } + void process(const cpp2::HBReq& req); private: explicit HBProcessor(kvstore::KVStore* kvstore) : BaseProcessor(kvstore) {} - - static ActiveHostsMan* hostsMan() { - static auto hostsMan - = std::make_unique(FLAGS_expired_hosts_check_interval_sec, - FLAGS_expired_threshold_sec); - return hostsMan.get(); - } }; } // namespace meta diff --git a/src/meta/test/HBProcessorTest.cpp b/src/meta/test/HBProcessorTest.cpp index cc15315955e..cf68b13b8cc 100644 --- a/src/meta/test/HBProcessorTest.cpp +++ b/src/meta/test/HBProcessorTest.cpp @@ -63,10 +63,10 @@ TEST(HBProcessorTest, HBTest) { auto resp = std::move(f).get(); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); } - auto hosts = HBProcessor::hostsMan()->getActiveHosts(); + auto hosts = ActiveHostsManHolder::hostsMan()->getActiveHosts(); ASSERT_EQ(5, hosts.size()); sleep(3); - ASSERT_EQ(0, HBProcessor::hostsMan()->getActiveHosts().size()); + ASSERT_EQ(0, ActiveHostsManHolder::hostsMan()->getActiveHosts().size()); LOG(INFO) << "Test for invalid host!"; cpp2::HBReq req; diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 1844683bc63..b0f221c26bb 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -15,6 +15,7 @@ #include "meta/ServerBasedSchemaManager.h" #include "dataman/ResultSchemaProvider.h" #include "meta/processors/HBProcessor.h" +#include "meta/test/TestUtils.h" DECLARE_int32(load_data_interval_secs); DECLARE_int32(heartbeat_interval_secs); @@ -47,6 +48,7 @@ TEST(MetaClientTest, InterfacesTest) { std::vector hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}}; auto r = client->addHosts(hosts).get(); ASSERT_TRUE(r.ok()); + TestUtils::registerHB(hosts); auto ret = client->listHosts().get(); ASSERT_TRUE(ret.ok()); ASSERT_EQ(hosts, ret.value()); @@ -54,7 +56,7 @@ TEST(MetaClientTest, InterfacesTest) { { // Test createSpace, listSpaces, getPartsAlloc. { - auto ret = client->createSpace("default_space", 9, 3).get(); + auto ret = client->createSpace("default_space", 8, 3).get(); ASSERT_TRUE(ret.ok()) << ret.status(); spaceId = ret.value(); } @@ -68,12 +70,9 @@ TEST(MetaClientTest, InterfacesTest) { { auto ret = client->getPartsAlloc(spaceId).get(); ASSERT_TRUE(ret.ok()) << ret.status(); + ASSERT_EQ(8, ret.value().size()); for (auto it = ret.value().begin(); it != ret.value().end(); it++) { - auto startIndex = it->first; - for (auto& h : it->second) { - ASSERT_EQ(startIndex++ % 4, h.first); - ASSERT_EQ(h.first, h.second); - } + ASSERT_EQ(3, it->second.size()); } } { @@ -165,16 +164,14 @@ TEST(MetaClientTest, InterfacesTest) { sleep(FLAGS_load_data_interval_secs + 1); { // Test cache interfaces - // For Host(0, 0) the parts should be 2, 3, 4, 6, 7, 8 auto partsMap = client->getPartsMapFromCache(HostAddr(0, 0)); ASSERT_EQ(1, partsMap.size()); ASSERT_EQ(6, partsMap[spaceId].size()); } { auto partMeta = client->getPartMetaFromCache(spaceId, 1); - int32_t startIndex = 1; + ASSERT_EQ(3, partMeta.peers_.size()); for (auto& h : partMeta.peers_) { - ASSERT_EQ(startIndex++ % 4, h.first); ASSERT_EQ(h.first, h.second); } } @@ -287,6 +284,7 @@ TEST(MetaClientTest, TagTest) { std::vector hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}}; auto r = client->addHosts(hosts).get(); ASSERT_TRUE(r.ok()); + TestUtils::registerHB(hosts); auto ret = client->createSpace("default_space", 9, 3).get(); ASSERT_TRUE(ret.ok()) << ret.status(); spaceId = ret.value(); @@ -387,6 +385,7 @@ TEST(MetaClientTest, DiffTest) { std::vector hosts = {{0, 0}}; auto r = client->addHosts(hosts).get(); ASSERT_TRUE(r.ok()); + TestUtils::registerHB(hosts); auto ret = client->listHosts().get(); ASSERT_TRUE(ret.ok()); ASSERT_EQ(hosts, ret.value()); @@ -441,7 +440,7 @@ TEST(MetaClientTest, HeartbeatTest) { ASSERT_EQ(hosts, ret.value()); } sleep(FLAGS_heartbeat_interval_secs + 1); - ASSERT_EQ(1, HBProcessor::hostsMan()->getActiveHosts().size()); + ASSERT_EQ(1, ActiveHostsManHolder::hostsMan()->getActiveHosts().size()); } } // namespace meta diff --git a/src/meta/test/ProcessorTest.cpp b/src/meta/test/ProcessorTest.cpp index b8c8d814b28..6dcd68c84fe 100644 --- a/src/meta/test/ProcessorTest.cpp +++ b/src/meta/test/ProcessorTest.cpp @@ -123,11 +123,10 @@ TEST(ProcessorTest, CreateSpaceTest) { { cpp2::SpaceProperties properties; properties.set_space_name("default_space"); - properties.set_partition_num(9); + properties.set_partition_num(8); properties.set_replica_factor(3); cpp2::CreateSpaceReq req; req.set_properties(std::move(properties)); - auto* processor = CreateSpaceProcessor::instance(kv.get()); auto f = processor->getFuture(); processor->process(req); @@ -155,18 +154,21 @@ TEST(ProcessorTest, CreateSpaceTest) { processor->process(req); auto resp = std::move(f).get(); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + std::unordered_map> hostsParts; for (auto& p : resp.get_parts()) { - auto startIndex = p.first; for (auto& h : p.second) { - ASSERT_EQ(startIndex++ % hostsNum, h.get_ip()); + hostsParts[std::make_pair(h.get_ip(), h.get_port())].insert(p.first); ASSERT_EQ(h.get_ip(), h.get_port()); } } + ASSERT_EQ(hostsNum, hostsParts.size()); + for (auto it = hostsParts.begin(); it != hostsParts.end(); it++) { + ASSERT_EQ(6, it->second.size()); + } } { cpp2::DropSpaceReq req; req.set_space_name("default_space"); - auto* processor = DropSpaceProcessor::instance(kv.get()); auto f = processor->getFuture(); processor->process(req); diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index 177fc695b15..148bdfc0192 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -4,6 +4,9 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ +#ifndef META_TEST_TESTUTILS_H_ +#define META_TEST_TESTUTILS_H_ + #include "base/Base.h" #include "kvstore/KVStore.h" #include "kvstore/PartManager.h" @@ -13,6 +16,8 @@ #include "meta/MetaServiceHandler.h" #include #include "interface/gen-cpp2/common_types.h" +#include "time/TimeUtils.h" +#include "meta/ActiveHostsMan.h" DECLARE_string(part_man_type); @@ -52,6 +57,14 @@ class TestUtils { return column; } + static void registerHB(const std::vector& hosts) { + ActiveHostsManHolder::hostsMan()->reset(); + auto now = time::TimeUtils::nowInSeconds(); + for (auto& h : hosts) { + ActiveHostsManHolder::hostsMan()->updateHostInfo(h, HostInfo(now)); + } + } + static int32_t createSomeHosts(kvstore::KVStore* kv, std::vector hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}}) { @@ -72,6 +85,7 @@ class TestUtils { auto resp = std::move(f).get(); EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); } + registerHB(hosts); { cpp2::ListHostsReq req; auto* processor = ListHostsProcessor::instance(kv); @@ -191,3 +205,4 @@ class TestUtils { } // namespace meta } // namespace nebula +#endif // META_TEST_TESTUTILS_H_ diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 159265a0d04..b61ff17cc1c 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -54,6 +54,9 @@ TEST(StorageClientTest, VerticesInterfacesTest) { LOG(INFO) << "Add hosts and create space...."; auto r = mClient->addHosts({HostAddr(localIp, localDataPort)}).get(); ASSERT_TRUE(r.ok()); + while (meta::ActiveHostsManHolder::hostsMan()->getActiveHosts().size() == 0) { + usleep(1000); + } auto ret = mClient->createSpace("default", 10, 1).get(); spaceId = ret.value(); sleep(2 * FLAGS_load_data_interval_secs + 1); diff --git a/src/storage/test/TestUtils.h b/src/storage/test/TestUtils.h index e3a0ce12294..84d0ef323d7 100644 --- a/src/storage/test/TestUtils.h +++ b/src/storage/test/TestUtils.h @@ -4,6 +4,9 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ +#ifndef STORAGE_TEST_TESTUTILS_H_ +#define STORAGE_TEST_TESTUTILS_H_ + #include "base/Base.h" #include "kvstore/KVStore.h" #include "kvstore/PartManager.h" @@ -206,3 +209,4 @@ class TestUtils { } // namespace storage } // namespace nebula +#endif // STORAGE_TEST_TESTUTILS_H_