From 51f07e5c28aff324579d266262b98d718a1e4415 Mon Sep 17 00:00:00 2001 From: heng Date: Thu, 9 May 2019 13:32:58 +0800 Subject: [PATCH] Adjust meta client ctor && implement heartbeat logic inside. --- src/daemons/GraphDaemon.cpp | 11 ++--- src/daemons/StorageDaemon.cpp | 22 +++++++-- src/graph/ExecutionEngine.cpp | 8 ++- src/graph/GraphFlags.cpp | 2 + src/graph/GraphFlags.h | 1 + src/graph/test/TestEnv.cpp | 2 +- src/kvstore/PartManager.cpp | 13 +---- src/meta/SchemaManager.cpp | 16 +----- src/meta/ServerBasedSchemaManager.cpp | 11 +---- src/meta/client/MetaClient.cpp | 68 +++++++++++++++++++------- src/meta/client/MetaClient.h | 16 ++++-- src/meta/processors/HBProcessor.h | 1 + src/meta/test/MetaClientTest.cpp | 30 ++++++++++++ src/storage/client/StorageClient.cpp | 13 ++--- src/storage/test/StorageClientTest.cpp | 15 +++--- src/storage/test/TestUtils.h | 5 +- 16 files changed, 148 insertions(+), 86 deletions(-) diff --git a/src/daemons/GraphDaemon.cpp b/src/daemons/GraphDaemon.cpp index 0da32def668..fac10ba442f 100644 --- a/src/daemons/GraphDaemon.cpp +++ b/src/daemons/GraphDaemon.cpp @@ -111,7 +111,12 @@ int main(int argc, char *argv[]) { localIP = std::move(result).value(); } + if (FLAGS_num_netio_threads <= 0) { + LOG(WARNING) << "Number netio threads should be greater than zero"; + return EXIT_FAILURE; + } gServer = std::make_unique(); + gServer->getIOThreadPool()->setNumThreads(FLAGS_num_netio_threads); auto interface = std::make_shared(gServer->getIOThreadPool()); gServer->setInterface(std::move(interface)); @@ -126,12 +131,6 @@ int main(int argc, char *argv[]) { gServer->setNumAcceptThreads(FLAGS_num_accept_threads); gServer->setListenBacklog(FLAGS_listen_backlog); gServer->setThreadStackSizeMB(5); - if (FLAGS_num_netio_threads > 0) { - gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads); - } else { - LOG(WARNING) << "Number netio threads should be greater than zero"; - return EXIT_FAILURE; - } // Setup the signal handlers status = setupSignalHandler(); diff --git a/src/daemons/StorageDaemon.cpp b/src/daemons/StorageDaemon.cpp index 29552cedfa4..9da39728cc0 100644 --- a/src/daemons/StorageDaemon.cpp +++ b/src/daemons/StorageDaemon.cpp @@ -24,6 +24,9 @@ DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP"); DEFINE_bool(mock_server, true, "start mock server"); DEFINE_bool(daemonize, true, "Whether to run the process as a daemon"); DEFINE_string(pid_file, "pids/nebula-storaged.pid", ""); +DEFINE_string(meta_server_addrs, "", "list of meta server addresses," + "the format looks like ip1:port1, ip2:port2, ip3:port3"); +DEFINE_int32(io_handlers, 10, "io handlers"); using nebula::Status; @@ -72,7 +75,7 @@ int main(int argc, char *argv[]) { } if (FLAGS_data_path.empty()) { - LOG(FATAL) << "Storage Data Path should not empty"; + LOG(ERROR) << "Storage Data Path should not empty"; return -1; } LOG(INFO) << "Starting the storage Daemon on port " << FLAGS_port @@ -88,7 +91,15 @@ int main(int argc, char *argv[]) { uint32_t localIP; CHECK(NetworkUtils::ipv4ToInt(result.value(), localIP)); - auto metaClient = std::make_unique(); + if (FLAGS_meta_server_addrs.empty()) { + LOG(ERROR) << "meta_server_addrs flag should be set!"; + return -1; + } + auto ioThreadPool = std::make_shared(FLAGS_io_handlers); + auto addrs = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); + auto metaClient = std::make_unique(ioThreadPool, + std::move(addrs), + true); metaClient->init(); nebula::kvstore::KVOptions options; @@ -114,7 +125,10 @@ int main(int argc, char *argv[]) { auto handler = std::make_shared(kvstore.get(), std::move(schemaMan)); gServer = std::make_unique(); - CHECK(!!gServer) << "Failed to create the thrift server"; + if (!!gServer) { + LOG(ERROR) << "Failed to create the thrift server"; + return -1; + } // Setup the signal handlers status = setupSignalHandler(); @@ -126,7 +140,7 @@ int main(int argc, char *argv[]) { gServer->setInterface(std::move(handler)); gServer->setPort(FLAGS_port); gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection - + gServer->setIOThreadPool(ioThreadPool); gServer->serve(); // Will wait until the server shuts down LOG(INFO) << "The storage Daemon on port " << FLAGS_port << " stopped"; diff --git a/src/graph/ExecutionEngine.cpp b/src/graph/ExecutionEngine.cpp index fbfeb65edc7..0e83cd921b6 100644 --- a/src/graph/ExecutionEngine.cpp +++ b/src/graph/ExecutionEngine.cpp @@ -10,6 +10,8 @@ #include "graph/ExecutionPlan.h" #include "storage/client/StorageClient.h" +DECLARE_string(meta_server_addrs); + namespace nebula { namespace graph { @@ -22,7 +24,11 @@ ExecutionEngine::~ExecutionEngine() { Status ExecutionEngine::init(std::shared_ptr ioExecutor) { - metaClient_ = std::make_unique(); + if (FLAGS_meta_server_addrs.empty()) { + return Status::Error("The meta_server_addrs flag should not be empty!"); + } + auto addrs = network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); + metaClient_ = std::make_unique(ioExecutor, std::move(addrs)); metaClient_->init(); schemaManager_ = meta::SchemaManager::create(); diff --git a/src/graph/GraphFlags.cpp b/src/graph/GraphFlags.cpp index 5eb28ddaca6..e80101b7dc6 100644 --- a/src/graph/GraphFlags.cpp +++ b/src/graph/GraphFlags.cpp @@ -25,3 +25,5 @@ DEFINE_bool(redirect_stdout, true, "Whether to redirect stdout and stderr to sep DEFINE_string(stdout_log_file, "graphd-stdout.log", "Destination filename of stdout"); DEFINE_string(stderr_log_file, "graphd-stderr.log", "Destination filename of stderr"); DEFINE_bool(daemonize, true, "Whether run as a daemon process"); +DEFINE_string(meta_server_addrs, "", "list of meta server addresses," + "the format looks like ip1:port1, ip2:port2, ip3:port3"); diff --git a/src/graph/GraphFlags.h b/src/graph/GraphFlags.h index 39ff063f800..cfe4121484a 100644 --- a/src/graph/GraphFlags.h +++ b/src/graph/GraphFlags.h @@ -24,6 +24,7 @@ DECLARE_bool(redirect_stdout); DECLARE_string(stdout_log_file); DECLARE_string(stderr_log_file); DECLARE_bool(daemonize); +DECLARE_string(meta_server_addrs); #endif // GRAPH_GRAPHFLAGS_H_ diff --git a/src/graph/test/TestEnv.cpp b/src/graph/test/TestEnv.cpp index bf6815cd616..58c5f79337c 100644 --- a/src/graph/test/TestEnv.cpp +++ b/src/graph/test/TestEnv.cpp @@ -27,10 +27,10 @@ void TestEnv::SetUp() { FLAGS_load_data_interval_second = 1; using ThriftServer = apache::thrift::ThriftServer; server_ = std::make_unique(); + server_->getIOThreadPool()->setNumThreads(1); auto interface = std::make_shared(server_->getIOThreadPool()); server_->setInterface(std::move(interface)); server_->setPort(0); // Let the system choose an available port for us - auto serve = [this] { server_->serve(); }; diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index f4f4ce3dc41..2d3bcb7217c 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -35,17 +35,8 @@ bool MemPartManager::partExist(const HostAddr& host, GraphSpaceID spaceId, Parti MetaServerBasedPartManager::MetaServerBasedPartManager(HostAddr host, meta::MetaClient *client) : localHost_(std::move(host)) { - if (nullptr == client) { - LOG(INFO) << "MetaClient is nullptr, create new one"; - // multi instances use one metaclient - static auto clientPtr = std::make_unique(); - static std::once_flag flag; - std::call_once(flag, std::bind(&meta::MetaClient::init, clientPtr.get())); - client_ = clientPtr.get(); - } else { - client_ = client; - } - + client_ = client; + CHECK_NOTNULL(client_); client_->registerListener(this); } diff --git a/src/meta/SchemaManager.cpp b/src/meta/SchemaManager.cpp index b40572f0017..8c803049f2a 100644 --- a/src/meta/SchemaManager.cpp +++ b/src/meta/SchemaManager.cpp @@ -6,26 +6,14 @@ #include "base/Base.h" #include "meta/SchemaManager.h" -#include "meta/FileBasedSchemaManager.h" #include "meta/ServerBasedSchemaManager.h" -DECLARE_string(schema_file); -DECLARE_string(meta_server_addrs); - namespace nebula { namespace meta { std::unique_ptr SchemaManager::create() { - if (!FLAGS_schema_file.empty()) { - std::unique_ptr sm(new FileBasedSchemaManager()); - return sm; - } else if (!FLAGS_meta_server_addrs.empty()) { - std::unique_ptr sm(new ServerBasedSchemaManager()); - return sm; - } else { - std::unique_ptr sm(new AdHocSchemaManager()); - return sm; - } + auto sm = std::unique_ptr(new ServerBasedSchemaManager()); + return sm; } void AdHocSchemaManager::addTagSchema(GraphSpaceID space, diff --git a/src/meta/ServerBasedSchemaManager.cpp b/src/meta/ServerBasedSchemaManager.cpp index bc64d41e43e..8c597df577d 100644 --- a/src/meta/ServerBasedSchemaManager.cpp +++ b/src/meta/ServerBasedSchemaManager.cpp @@ -17,15 +17,8 @@ ServerBasedSchemaManager::~ServerBasedSchemaManager() { } void ServerBasedSchemaManager::init(MetaClient *client) { - if (nullptr == client) { - LOG(INFO) << "MetaClient is nullptr, create new one"; - static auto clientPtr = std::make_unique(); - static std::once_flag flag; - std::call_once(flag, std::bind(&meta::MetaClient::init, clientPtr.get())); - metaClient_ = clientPtr.get(); - } else { - metaClient_ = client; - } + metaClient_ = client; + CHECK_NOTNULL(metaClient_); } std::shared_ptr diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index a6c5473dbd4..6d212cbffa6 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -10,25 +10,18 @@ #include "meta/NebulaSchemaProvider.h" DEFINE_int32(load_data_interval_second, 2 * 60, "Load data interval, unit: second"); -DEFINE_string(meta_server_addrs, "", "list of meta server addresses," - "the format looks like ip1:port1, ip2:port2, ip3:port3"); -DEFINE_int32(meta_client_io_threads, 3, "meta client io threads"); +DEFINE_int32(heartbeat_interval_sec, 10, "Heartbeat interval, unit: second"); namespace nebula { namespace meta { MetaClient::MetaClient(std::shared_ptr ioThreadPool, - std::vector addrs) + std::vector addrs, + bool sendHeartBeat) : ioThreadPool_(ioThreadPool) - , addrs_(std::move(addrs)) { - if (ioThreadPool_ == nullptr) { - ioThreadPool_ - = std::make_shared(FLAGS_meta_client_io_threads); - } - if (addrs_.empty() && !FLAGS_meta_server_addrs.empty()) { - addrs_ = network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); - } - CHECK(!addrs_.empty()); + , addrs_(std::move(addrs)) + , sendHeartBeat_(sendHeartBeat) { + CHECK(ioThreadPool_ != nullptr && !addrs_.empty()); clientsMan_ = std::make_shared>(); updateHost(); @@ -44,13 +37,39 @@ MetaClient::~MetaClient() { void MetaClient::init() { loadDataThreadFunc(); CHECK(loadDataThread_.start()); - size_t delayMS = FLAGS_load_data_interval_second * 1000 + folly::Random::rand32(900); - loadDataThread_.addTimerTask(delayMS, - FLAGS_load_data_interval_second * 1000, - &MetaClient::loadDataThreadFunc, this); + { + size_t delayMS = FLAGS_load_data_interval_second * 1000 + folly::Random::rand32(900); + LOG(INFO) << "Register timer task for load data!"; + loadDataThread_.addTimerTask(delayMS, + FLAGS_load_data_interval_second * 1000, + &MetaClient::loadDataThreadFunc, this); + } + if (sendHeartBeat_) { + LOG(INFO) << "Register time task for heartbeat!"; + size_t delayMS = FLAGS_heartbeat_interval_sec * 1000 + folly::Random::rand32(900); + loadDataThread_.addTimerTask(delayMS, + FLAGS_heartbeat_interval_sec * 1000, + &MetaClient::heartBeatThreadFunc, this); + } +} + +void MetaClient::heartBeatThreadFunc() { + if (listener_ == nullptr) { + VLOG(1) << "Can't send heartbeat due to listener_ is nullptr!"; + return; + } + auto ret = heartbeat().get(); + if (!ret.ok()) { + LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); + return; + } } void MetaClient::loadDataThreadFunc() { + if (ioThreadPool_->numThreads() <= 0) { + LOG(ERROR) << "The threads number in ioThreadPool should be greater than 0"; + return; + } auto ret = listSpaces().get(); if (!ret.ok()) { LOG(ERROR) << "List space failed, status:" << ret.status(); @@ -824,5 +843,20 @@ SchemaVer MetaClient::getNewestEdgeVerFromCache(const GraphSpaceID& space, return it->second; } +folly::Future> MetaClient::heartbeat() { + CHECK_NOTNULL(listener_); + auto localHost = listener_->getLocalHost(); + cpp2::HBReq req; + nebula::cpp2::HostAddr thriftHost; + thriftHost.set_ip(localHost.first); + thriftHost.set_port(localHost.second); + req.set_host(std::move(thriftHost)); + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_heartBeat(request); + }, [] (cpp2::HBResp&& resp) -> decltype(auto) { + return resp.code == cpp2::ErrorCode::SUCCEEDED; + }, true); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index 9a6c5a15b31..b171d944932 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -59,8 +59,9 @@ class MetaChangedListener { class MetaClient { public: - explicit MetaClient(std::shared_ptr ioThreadPool = nullptr, - std::vector addrs = {}); + explicit MetaClient(std::shared_ptr ioThreadPool, + std::vector addrs, + bool sendHeartBeat = false); virtual ~MetaClient(); @@ -182,6 +183,8 @@ class MetaClient { protected: void loadDataThreadFunc(); + void heartBeatThreadFunc(); + bool loadSchemas(GraphSpaceID spaceId, std::shared_ptr spaceInfoCache, SpaceTagNameIdMap &tagNameIdMap, @@ -189,6 +192,8 @@ class MetaClient { SpaceNewestTagVerMap &newestTagVerMap, SpaceNewestEdgeVerMap &newestEdgeVerMap); + folly::Future> heartbeat(); + std::unordered_map> reverse(const PartsAlloc& parts); void updateHost() { @@ -228,20 +233,21 @@ class MetaClient { private: std::shared_ptr ioThreadPool_; std::shared_ptr> clientsMan_; + std::unordered_map> localCache_; std::vector addrs_; // The lock used to protect active_ and leader_. folly::RWSpinLock hostLock_; HostAddr active_; HostAddr leader_; thread::GenericWorker loadDataThread_; - std::unordered_map> localCache_; SpaceNameIdMap spaceIndexByName_; SpaceTagNameIdMap spaceTagIndexByName_; SpaceEdgeNameTypeMap spaceEdgeIndexByName_; SpaceNewestTagVerMap spaceNewestTagVerMap_; SpaceNewestEdgeVerMap spaceNewestEdgeVerMap_; - folly::RWSpinLock localCacheLock_; - MetaChangedListener* listener_{nullptr}; + folly::RWSpinLock localCacheLock_; + MetaChangedListener* listener_{nullptr}; + bool sendHeartBeat_ = false; }; } // namespace meta } // namespace nebula diff --git a/src/meta/processors/HBProcessor.h b/src/meta/processors/HBProcessor.h index a449a52f0c3..922e9206f99 100644 --- a/src/meta/processors/HBProcessor.h +++ b/src/meta/processors/HBProcessor.h @@ -20,6 +20,7 @@ namespace meta { class HBProcessor : public BaseProcessor { FRIEND_TEST(HBProcessorTest, HBTest); + FRIEND_TEST(MetaClientTest, HeartbeatTest); public: static HBProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 6672b83a169..6f755b5cbf7 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -13,8 +13,10 @@ #include "meta/MetaServiceUtils.h" #include "meta/ServerBasedSchemaManager.h" #include "dataman/ResultSchemaProvider.h" +#include "meta/processors/HBProcessor.h" DECLARE_int32(load_data_interval_second); +DECLARE_int32(heartbeat_interval_sec); namespace nebula { namespace meta { @@ -401,6 +403,34 @@ TEST(MetaClientTest, DiffTest) { ASSERT_EQ(9, listener->partNum); } +TEST(MetaClientTest, HeartbeatTest) { + FLAGS_load_data_interval_second = 5; + FLAGS_heartbeat_interval_sec = 1; + fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); + auto sc = TestUtils::mockServer(10001, rootPath.path()); + + auto threadPool = std::make_shared(1); + uint32_t localIp; + network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); + auto listener = std::make_unique(); + auto client = std::make_shared(threadPool, + std::vector{HostAddr(localIp, 10001)}, + true); // send heartbeat + client->registerListener(listener.get()); + client->init(); + { + // Test addHost, listHosts interface. + std::vector hosts = {{0, 0}}; + auto r = client->addHosts(hosts).get(); + ASSERT_TRUE(r.ok()); + auto ret = client->listHosts().get(); + ASSERT_TRUE(ret.ok()); + ASSERT_EQ(hosts, ret.value()); + } + sleep(FLAGS_heartbeat_interval_sec + 1); + ASSERT_EQ(1, HBProcessor::hostsMan()->getActiveHosts().size()); +} + } // namespace meta } // namespace nebula diff --git a/src/storage/client/StorageClient.cpp b/src/storage/client/StorageClient.cpp index f212c94f36f..96289b990f1 100644 --- a/src/storage/client/StorageClient.cpp +++ b/src/storage/client/StorageClient.cpp @@ -15,16 +15,9 @@ namespace storage { StorageClient::StorageClient(std::shared_ptr threadPool, meta::MetaClient *client) - : ioThreadPool_(threadPool) { - if (nullptr == client) { - LOG(INFO) << "MetaClient is nullptr, create new one"; - static auto clientPtr = std::make_unique(); - static std::once_flag flag; - std::call_once(flag, std::bind(&meta::MetaClient::init, clientPtr.get())); - client_ = clientPtr.get(); - } else { - client_ = client; - } + : ioThreadPool_(threadPool) + , client_(client) { + CHECK_NOTNULL(client_); clientsMan_ = std::make_unique>(); } diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 7c99ac72d38..d820df8e7d1 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -28,26 +28,29 @@ TEST(StorageClientTest, VerticesInterfacesTest) { network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); uint32_t localMetaPort = 10001; uint32_t localDataPort = 20002; - FLAGS_meta_server_addrs = folly::stringPrintf("127.0.0.1:%d", localMetaPort); LOG(INFO) << "Start meta server...."; std::string metaPath = folly::stringPrintf("%s/meta", rootPath.path()); auto metaServerContext = meta::TestUtils::mockServer(10001, metaPath.c_str()); + LOG(INFO) << "Create meta client..."; + auto threadPool = std::make_shared(1); + auto addrs = network::NetworkUtils::toHosts(folly::stringPrintf("127.0.0.1:%d", localMetaPort)); + auto mClient = std::make_unique(threadPool, std::move(addrs), true); + mClient->init(); + LOG(INFO) << "Start data server...."; std::string dataPath = folly::stringPrintf("%s/data", rootPath.path()); - auto sc = TestUtils::mockServer(dataPath.c_str(), localIp, localDataPort); + auto sc = TestUtils::mockServer(mClient.get(), dataPath.c_str(), localIp, localDataPort); LOG(INFO) << "Add hosts and create space...."; - auto mClient = std::make_unique(); - mClient->init(); - auto r = mClient->addHosts({HostAddr(localIp, localDataPort)}).get(); + + auto r = mClient->addHosts({HostAddr(localIp, localDataPort)}).get(); ASSERT_TRUE(r.ok()); auto ret = mClient->createSpace("default", 10, 1).get(); spaceId = ret.value(); sleep(2 * FLAGS_load_data_interval_second + 1); - auto threadPool = std::make_shared(1); auto client = std::make_unique(threadPool, mClient.get()); // VerticesInterfacesTest(addVertices and getVertexProps) { diff --git a/src/storage/test/TestUtils.h b/src/storage/test/TestUtils.h index 9f847825176..12070813470 100644 --- a/src/storage/test/TestUtils.h +++ b/src/storage/test/TestUtils.h @@ -165,7 +165,8 @@ class TestUtils { uint32_t port_; }; - static std::unique_ptr mockServer(const char* dataPath, + static std::unique_ptr mockServer(meta::MetaClient* mClient, + const char* dataPath, uint32_t ip, uint32_t port = 0) { auto sc = std::make_unique(); @@ -178,7 +179,7 @@ class TestUtils { options.local_ = HostAddr(ip, port); options.dataPaths_ = std::move(paths); options.partMan_ - = std::make_unique(options.local_); + = std::make_unique(options.local_, mClient); kvstore::NebulaStore* kvPtr = static_cast( kvstore::KVStore::instance(std::move(options))); std::unique_ptr kv(kvPtr);