diff --git a/src/common/network/NetworkUtils.cpp b/src/common/network/NetworkUtils.cpp index a0162e80093..807ad76a8b4 100644 --- a/src/common/network/NetworkUtils.cpp +++ b/src/common/network/NetworkUtils.cpp @@ -204,36 +204,42 @@ std::string NetworkUtils::intToIPv4(uint32_t ip) { return buf; } - -HostAddr NetworkUtils::toHostAddr(const folly::StringPiece ip, int32_t port) { +StatusOr NetworkUtils::toHostAddr(folly::StringPiece ip, int32_t port) { uint32_t ipV4; - CHECK(ipv4ToInt(ip.toString(), ipV4)); + if (!ipv4ToInt(ip.toString(), ipV4)) { + return Status::Error("Bad ip format:%s", ip.start()); + } return std::make_pair(ipV4, port); } - -HostAddr NetworkUtils::toHostAddr(const folly::StringPiece ipPort) { +StatusOr NetworkUtils::toHostAddr(folly::StringPiece ipPort) { auto pos = ipPort.find(':'); - CHECK_NE(pos, folly::StringPiece::npos); + if (pos == folly::StringPiece::npos) { + return Status::Error("Bad peer format: %s", ipPort.start()); + } int32_t port; try { port = folly::to(ipPort.subpiece(pos + 1)); } catch (const std::exception& ex) { - LOG(FATAL) << "Bad ipPort: " << ex.what(); + return Status::Error("Bad port number, error: %s", ex.what()); } return toHostAddr(ipPort.subpiece(0, pos), port); } -std::vector NetworkUtils::toHosts(const std::string& peersStr) { +StatusOr> NetworkUtils::toHosts(const std::string& peersStr) { std::vector hosts; std::vector peers; folly::split(",", peersStr, peers, true); - hosts.resize(peers.size()); - std::transform(peers.begin(), peers.end(), hosts.begin(), [](auto& p) { - return network::NetworkUtils::toHostAddr(folly::trimWhitespace(p)); - }); + hosts.reserve(peers.size()); + for (auto& peerStr : peers) { + auto hostAddr = network::NetworkUtils::toHostAddr(folly::trimWhitespace(peerStr)); + if (!hostAddr.ok()) { + return hostAddr.status(); + } + hosts.emplace_back(hostAddr.value()); + } return hosts; } diff --git a/src/common/network/NetworkUtils.h b/src/common/network/NetworkUtils.h index 2c29af10804..ecf83071521 100644 --- a/src/common/network/NetworkUtils.h +++ b/src/common/network/NetworkUtils.h @@ -34,9 +34,9 @@ class NetworkUtils final { static uint16_t getAvailablePort(); // Convert the given IP (must be in the form of "xx.xx.xx.xx") and Port to a HostAddr - static HostAddr toHostAddr(const folly::StringPiece ip, int32_t port); + static StatusOr toHostAddr(folly::StringPiece ip, int32_t port); // Convert the given IP/Port (must be in the form of "xx.xx.xx.xx:pp") to a HostAddr - static HostAddr toHostAddr(const folly::StringPiece ipPort); + static StatusOr toHostAddr(folly::StringPiece ipPort); // Retrieve the string-form IP from the given HostAddr static std::string ipFromHostAddr(const HostAddr& host); // Retrieve the port number from the given HostAddr @@ -57,7 +57,8 @@ class NetworkUtils final { // Convert peers str which is a list of ipPort joined with comma into HostAddr list. // (Peers str format example: 192.168.1.1:10001, 192.168.1.2:10001) - static std::vector toHosts(const std::string& peersStr); + // Return Status::Error if peersStr is invalid. + static StatusOr> toHosts(const std::string& peersStr); private: }; diff --git a/src/daemons/GraphDaemon.cpp b/src/daemons/GraphDaemon.cpp index bd5695a0a12..c04d0500c3a 100644 --- a/src/daemons/GraphDaemon.cpp +++ b/src/daemons/GraphDaemon.cpp @@ -112,7 +112,12 @@ int main(int argc, char *argv[]) { localIP = std::move(result).value(); } + if (FLAGS_num_netio_threads <= 0) { + LOG(WARNING) << "Number netio threads should be greater than zero"; + return EXIT_FAILURE; + } gServer = std::make_unique(); + gServer->getIOThreadPool()->setNumThreads(FLAGS_num_netio_threads); auto interface = std::make_shared(gServer->getIOThreadPool()); gServer->setInterface(std::move(interface)); @@ -127,12 +132,6 @@ int main(int argc, char *argv[]) { gServer->setNumAcceptThreads(FLAGS_num_accept_threads); gServer->setListenBacklog(FLAGS_listen_backlog); gServer->setThreadStackSizeMB(5); - if (FLAGS_num_netio_threads > 0) { - gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads); - } else { - LOG(WARNING) << "Number netio threads should be greater than zero"; - return EXIT_FAILURE; - } // Setup the signal handlers status = setupSignalHandler(); diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index e63734b044a..0d8cac98c6a 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -27,21 +27,6 @@ DECLARE_string(part_man_type); DEFINE_string(pid_file, "pids/nebula-metad.pid", "File to hold the process id"); DEFINE_bool(daemonize, true, "Whether run as a daemon process"); -namespace nebula { - -std::vector toHosts(const std::string& peersStr) { - std::vector hosts; - std::vector peers; - folly::split(",", peersStr, peers, true); - std::transform(peers.begin(), peers.end(), hosts.begin(), [](auto& p) { - return network::NetworkUtils::toHostAddr(folly::trimWhitespace(p)); - }); - return hosts; -} - -} // namespace nebula - - static std::unique_ptr gServer; static void signalHandler(int sig); @@ -91,21 +76,38 @@ int main(int argc, char *argv[]) { LOG(ERROR) << "Failed to start web service: " << status; return EXIT_FAILURE; } - LOG(INFO) << "Starting the meta Daemon on port " << FLAGS_port - << ", dataPath " << FLAGS_data_path; auto result = nebula::network::NetworkUtils::getLocalIP(FLAGS_local_ip); - CHECK(result.ok()) << result.status(); - uint32_t localIP; - CHECK(nebula::network::NetworkUtils::ipv4ToInt(result.value(), localIP)); + if (!result.ok()) { + LOG(ERROR) << "Get local ip failed! status:" << result.status(); + return EXIT_FAILURE; + } + auto hostAddrRet = nebula::network::NetworkUtils::toHostAddr(result.value(), FLAGS_port); + if (!hostAddrRet.ok()) { + LOG(ERROR) << "Bad local host addr, status:" << hostAddrRet.status(); + return EXIT_FAILURE; + } + auto& localHost = hostAddrRet.value(); + + auto peersRet = nebula::network::NetworkUtils::toHosts(FLAGS_peers); + if (!peersRet.ok()) { + LOG(ERROR) << "Can't get peers address, status:" << peersRet.status(); + return EXIT_FAILURE; + } + // Setup the signal handlers + status = setupSignalHandler(); + if (!status.ok()) { + LOG(ERROR) << status; + return EXIT_FAILURE; + } auto partMan = std::make_unique(); // The meta server has only one space, one part. - partMan->addPart(0, 0, nebula::toHosts(FLAGS_peers)); + partMan->addPart(0, 0, std::move(peersRet.value())); nebula::kvstore::KVOptions options; - options.local_ = nebula::HostAddr(localIP, FLAGS_port); + options.local_ = localHost; options.dataPaths_ = {FLAGS_data_path}; options.partMan_ = std::move(partMan); std::unique_ptr kvstore( @@ -113,28 +115,19 @@ int main(int argc, char *argv[]) { auto handler = std::make_shared(kvstore.get()); - gServer = std::make_unique(); - CHECK(!!gServer) << "Failed to create the thrift server"; - - gServer->setInterface(std::move(handler)); - gServer->setPort(FLAGS_port); - gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection - - // Setup the signal handlers - status = setupSignalHandler(); - if (!status.ok()) { - LOG(ERROR) << status; - return EXIT_FAILURE; - } - + nebula::operator<<(operator<<(LOG(INFO), "The meta deamon start on "), localHost); try { + gServer = std::make_unique(); + gServer->setInterface(std::move(handler)); + gServer->setPort(FLAGS_port); + gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection gServer->serve(); // Will wait until the server shuts down } catch (const std::exception &e) { LOG(ERROR) << "Exception thrown: " << e.what(); return EXIT_FAILURE; } - LOG(INFO) << "The storage Daemon on port " << FLAGS_port << " stopped"; + LOG(INFO) << "The meta Daemon stopped"; } diff --git a/src/daemons/StorageDaemon.cpp b/src/daemons/StorageDaemon.cpp index 5a1cdbfe9fc..ce47d9c7678 100644 --- a/src/daemons/StorageDaemon.cpp +++ b/src/daemons/StorageDaemon.cpp @@ -24,6 +24,9 @@ DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP"); DEFINE_bool(mock_server, true, "start mock server"); DEFINE_bool(daemonize, true, "Whether to run the process as a daemon"); DEFINE_string(pid_file, "pids/nebula-storaged.pid", "File to hold the process id"); +DEFINE_string(meta_server_addrs, "", "list of meta server addresses," + "the format looks like ip1:port1, ip2:port2, ip3:port3"); +DEFINE_int32(io_handlers, 10, "io handlers"); using nebula::Status; using nebula::HostAddr; @@ -72,27 +75,47 @@ int main(int argc, char *argv[]) { } if (FLAGS_data_path.empty()) { - LOG(FATAL) << "Storage Data Path should not empty"; - return -1; + LOG(ERROR) << "Storage Data Path should not empty"; + return EXIT_FAILURE; + } + + auto result = nebula::network::NetworkUtils::getLocalIP(FLAGS_local_ip); + if (!result.ok()) { + LOG(ERROR) << "Get localIp failed, ip " << FLAGS_local_ip + << ", status:" << result.status(); + return EXIT_FAILURE; + } + auto hostRet = nebula::network::NetworkUtils::toHostAddr(result.value(), FLAGS_port); + if (!hostRet.ok()) { + LOG(ERROR) << "Bad local host addr, status:" << hostRet.status(); + return EXIT_FAILURE; + } + auto& localHost = hostRet.value(); + auto metaAddrsRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); + if (!metaAddrsRet.ok() || metaAddrsRet.value().empty()) { + LOG(ERROR) << "Can't get metaServer address, status:" << metaAddrsRet.status() + << ", FLAGS_meta_server_addrs:" << FLAGS_meta_server_addrs; + return EXIT_FAILURE; } - LOG(INFO) << "Starting the storage Daemon on port " << FLAGS_port - << ", dataPath " << FLAGS_data_path; std::vector paths; folly::split(",", FLAGS_data_path, paths, true); std::transform(paths.begin(), paths.end(), paths.begin(), [](auto& p) { return folly::trimWhitespace(p).str(); }); - auto result = nebula::network::NetworkUtils::getLocalIP(FLAGS_local_ip); - CHECK(result.ok()) << result.status(); - uint32_t localIP; - CHECK(NetworkUtils::ipv4ToInt(result.value(), localIP)); + if (paths.empty()) { + LOG(ERROR) << "Bad data_path format:" << FLAGS_data_path; + return EXIT_FAILURE; + } - auto metaClient = std::make_unique(); + auto ioThreadPool = std::make_shared(FLAGS_io_handlers); + auto metaClient = std::make_unique(ioThreadPool, + std::move(metaAddrsRet.value()), + true); metaClient->init(); nebula::kvstore::KVOptions options; - options.local_ = HostAddr(localIP, FLAGS_port); + options.local_ = localHost; options.dataPaths_ = std::move(paths); options.partMan_ = std::make_unique( options.local_, metaClient.get()); @@ -111,11 +134,6 @@ int main(int argc, char *argv[]) { LOG(ERROR) << "Failed to start web service: " << status; return EXIT_FAILURE; } - - auto handler = std::make_shared(kvstore.get(), std::move(schemaMan)); - gServer = std::make_unique(); - CHECK(!!gServer) << "Failed to create the thrift server"; - // Setup the signal handlers status = setupSignalHandler(); if (!status.ok()) { @@ -123,13 +141,21 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } - gServer->setInterface(std::move(handler)); - gServer->setPort(FLAGS_port); - gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection - - gServer->serve(); // Will wait until the server shuts down + auto handler = std::make_shared(kvstore.get(), std::move(schemaMan)); + try { + nebula::operator<<(operator<<(LOG(INFO), "The storage deamon start on "), localHost); + gServer = std::make_unique(); + gServer->setInterface(std::move(handler)); + gServer->setPort(FLAGS_port); + gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection + gServer->setIOThreadPool(ioThreadPool); + gServer->serve(); // Will wait until the server shuts down + } catch (const std::exception& e) { + LOG(ERROR) << "Start thrift server failed, error:" << e.what(); + return EXIT_FAILURE; + } - LOG(INFO) << "The storage Daemon on port " << FLAGS_port << " stopped"; + LOG(INFO) << "The storage Daemon stopped"; } diff --git a/src/graph/ExecutionEngine.cpp b/src/graph/ExecutionEngine.cpp index f91388ea117..cab9571b771 100644 --- a/src/graph/ExecutionEngine.cpp +++ b/src/graph/ExecutionEngine.cpp @@ -10,6 +10,8 @@ #include "graph/ExecutionPlan.h" #include "storage/client/StorageClient.h" +DECLARE_string(meta_server_addrs); + namespace nebula { namespace graph { @@ -22,7 +24,11 @@ ExecutionEngine::~ExecutionEngine() { Status ExecutionEngine::init(std::shared_ptr ioExecutor) { - metaClient_ = std::make_unique(); + auto addrs = network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); + if (!addrs.ok()) { + return addrs.status(); + } + metaClient_ = std::make_unique(ioExecutor, std::move(addrs.value())); metaClient_->init(); schemaManager_ = meta::SchemaManager::create(); diff --git a/src/graph/GraphFlags.cpp b/src/graph/GraphFlags.cpp index 00d374511ff..e951faf0152 100644 --- a/src/graph/GraphFlags.cpp +++ b/src/graph/GraphFlags.cpp @@ -25,3 +25,5 @@ DEFINE_bool(redirect_stdout, true, "Whether to redirect stdout and stderr to sep DEFINE_string(stdout_log_file, "graphd-stdout.log", "Destination filename of stdout"); DEFINE_string(stderr_log_file, "graphd-stderr.log", "Destination filename of stderr"); DEFINE_bool(daemonize, true, "Whether run as a daemon process"); +DEFINE_string(meta_server_addrs, "", "list of meta server addresses," + "the format looks like ip1:port1, ip2:port2, ip3:port3"); diff --git a/src/graph/GraphFlags.h b/src/graph/GraphFlags.h index 5046969e9d8..ebd10f4ca03 100644 --- a/src/graph/GraphFlags.h +++ b/src/graph/GraphFlags.h @@ -24,6 +24,7 @@ DECLARE_bool(redirect_stdout); DECLARE_string(stdout_log_file); DECLARE_string(stderr_log_file); DECLARE_bool(daemonize); +DECLARE_string(meta_server_addrs); #endif // GRAPH_GRAPHFLAGS_H_ diff --git a/src/graph/test/SchemaTest.cpp b/src/graph/test/SchemaTest.cpp index 862c855ca1b..72ff684fa93 100644 --- a/src/graph/test/SchemaTest.cpp +++ b/src/graph/test/SchemaTest.cpp @@ -8,7 +8,7 @@ #include "graph/test/TestEnv.h" #include "graph/test/TestBase.h" -DECLARE_int32(load_data_interval_second); +DECLARE_int32(load_data_interval_secs); namespace nebula { namespace graph { @@ -58,7 +58,7 @@ TEST_F(SchemaTest, metaCommunication) { auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); { cpp2::ExecutionResponse resp; std::string query = "USE default_space"; @@ -70,10 +70,10 @@ TEST_F(SchemaTest, metaCommunication) { std::string query = "CREATE TAG person(name string, email_addr string, " "age int, gender string, row_timestamp timestamp)"; auto code = client->execute(query, resp); - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); { cpp2::ExecutionResponse resp; std::string query = "DESCRIBE TAG person"; @@ -101,7 +101,7 @@ TEST_F(SchemaTest, metaCommunication) { auto code = client->execute(query, resp); ASSERT_NE(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); { cpp2::ExecutionResponse resp; std::string query = "DESCRIBE TAG account"; @@ -128,7 +128,7 @@ TEST_F(SchemaTest, metaCommunication) { auto code = client->execute(query, resp); ASSERT_NE(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); { cpp2::ExecutionResponse resp; std::string query = "DESCRIBE TAG account"; @@ -153,7 +153,7 @@ TEST_F(SchemaTest, metaCommunication) { auto code = client->execute(query, resp); ASSERT_NE(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); { cpp2::ExecutionResponse resp; std::string query = "DESCRIBE EDGE buy"; @@ -170,7 +170,7 @@ TEST_F(SchemaTest, metaCommunication) { auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); { cpp2::ExecutionResponse resp; std::string query = "DESCRIBE EDGE education"; @@ -186,7 +186,7 @@ TEST_F(SchemaTest, metaCommunication) { cpp2::ExecutionResponse resp; std::string query = "SHOW EDGES"; auto code = client->execute(query, resp); - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); std::vector> expected{ {"buy"}, @@ -210,7 +210,7 @@ TEST_F(SchemaTest, metaCommunication) { auto code = client->execute(query, resp); ASSERT_NE(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); { cpp2::ExecutionResponse resp; std::string query = "DESCRIBE EDGE education"; @@ -228,7 +228,7 @@ TEST_F(SchemaTest, metaCommunication) { auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); { cpp2::ExecutionResponse resp; std::string query = "USE my_space"; @@ -240,10 +240,10 @@ TEST_F(SchemaTest, metaCommunication) { cpp2::ExecutionResponse resp; std::string query = "CREATE TAG animal(name string, kind string)"; auto code = client->execute(query, resp); - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); { cpp2::ExecutionResponse resp; std::string query = "DESCRIBE TAG animal"; @@ -265,7 +265,7 @@ TEST_F(SchemaTest, metaCommunication) { cpp2::ExecutionResponse resp; std::string query = "SHOW TAGS"; auto code = client->execute(query, resp); - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); std::vector> expected{ {"animal"}, @@ -277,7 +277,7 @@ TEST_F(SchemaTest, metaCommunication) { cpp2::ExecutionResponse resp; std::string query = "REMOVE TAG person "; auto code = client->execute(query, resp); - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } { diff --git a/src/graph/test/TestEnv.cpp b/src/graph/test/TestEnv.cpp index c5e0199bb36..ad1ab5a2b34 100644 --- a/src/graph/test/TestEnv.cpp +++ b/src/graph/test/TestEnv.cpp @@ -7,7 +7,7 @@ #include "base/Base.h" #include "graph/test/TestEnv.h" -DECLARE_int32(load_data_interval_second); +DECLARE_int32(load_data_interval_secs); namespace nebula { namespace graph { @@ -24,13 +24,13 @@ TestEnv::~TestEnv() { void TestEnv::SetUp() { - FLAGS_load_data_interval_second = 1; + FLAGS_load_data_interval_secs = 1; using ThriftServer = apache::thrift::ThriftServer; server_ = std::make_unique(); + server_->getIOThreadPool()->setNumThreads(1); auto interface = std::make_shared(server_->getIOThreadPool()); server_->setInterface(std::move(interface)); server_->setPort(0); // Let the system choose an available port for us - auto serve = [this] { server_->serve(); }; diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index c247c8c7365..73011c9a796 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -35,17 +35,8 @@ bool MemPartManager::partExist(const HostAddr& host, GraphSpaceID spaceId, Parti MetaServerBasedPartManager::MetaServerBasedPartManager(HostAddr host, meta::MetaClient *client) : localHost_(std::move(host)) { - if (nullptr == client) { - LOG(INFO) << "MetaClient is nullptr, create new one"; - // multi instances use one metaclient - static auto clientPtr = std::make_unique(); - static std::once_flag flag; - std::call_once(flag, std::bind(&meta::MetaClient::init, clientPtr.get())); - client_ = clientPtr.get(); - } else { - client_ = client; - } - + client_ = client; + CHECK_NOTNULL(client_); client_->registerListener(this); } diff --git a/src/meta/SchemaManager.cpp b/src/meta/SchemaManager.cpp index 1ce34f7cbc4..349b50ddebb 100644 --- a/src/meta/SchemaManager.cpp +++ b/src/meta/SchemaManager.cpp @@ -6,26 +6,14 @@ #include "base/Base.h" #include "meta/SchemaManager.h" -#include "meta/FileBasedSchemaManager.h" #include "meta/ServerBasedSchemaManager.h" -DECLARE_string(schema_file); -DECLARE_string(meta_server_addrs); - namespace nebula { namespace meta { std::unique_ptr SchemaManager::create() { - if (!FLAGS_schema_file.empty()) { - std::unique_ptr sm(new FileBasedSchemaManager()); - return sm; - } else if (!FLAGS_meta_server_addrs.empty()) { - std::unique_ptr sm(new ServerBasedSchemaManager()); - return sm; - } else { - std::unique_ptr sm(new AdHocSchemaManager()); - return sm; - } + auto sm = std::unique_ptr(new ServerBasedSchemaManager()); + return sm; } void AdHocSchemaManager::addTagSchema(GraphSpaceID space, diff --git a/src/meta/ServerBasedSchemaManager.cpp b/src/meta/ServerBasedSchemaManager.cpp index a014cfcfd83..e9aaffa688f 100644 --- a/src/meta/ServerBasedSchemaManager.cpp +++ b/src/meta/ServerBasedSchemaManager.cpp @@ -17,15 +17,8 @@ ServerBasedSchemaManager::~ServerBasedSchemaManager() { } void ServerBasedSchemaManager::init(MetaClient *client) { - if (nullptr == client) { - LOG(INFO) << "MetaClient is nullptr, create new one"; - static auto clientPtr = std::make_unique(); - static std::once_flag flag; - std::call_once(flag, std::bind(&meta::MetaClient::init, clientPtr.get())); - metaClient_ = clientPtr.get(); - } else { - metaClient_ = client; - } + metaClient_ = client; + CHECK_NOTNULL(metaClient_); } std::shared_ptr diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 1392f58f47d..b30edf9d579 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -9,26 +9,19 @@ #include "network/NetworkUtils.h" #include "meta/NebulaSchemaProvider.h" -DEFINE_int32(load_data_interval_second, 2 * 60, "Load data interval, unit: second"); -DEFINE_string(meta_server_addrs, "", "list of meta server addresses," - "the format looks like ip1:port1, ip2:port2, ip3:port3"); -DEFINE_int32(meta_client_io_threads, 3, "meta client io threads"); +DEFINE_int32(load_data_interval_secs, 2 * 60, "Load data interval"); +DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval"); namespace nebula { namespace meta { MetaClient::MetaClient(std::shared_ptr ioThreadPool, - std::vector addrs) + std::vector addrs, + bool sendHeartBeat) : ioThreadPool_(ioThreadPool) - , addrs_(std::move(addrs)) { - if (ioThreadPool_ == nullptr) { - ioThreadPool_ - = std::make_shared(FLAGS_meta_client_io_threads); - } - if (addrs_.empty() && !FLAGS_meta_server_addrs.empty()) { - addrs_ = network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); - } - CHECK(!addrs_.empty()); + , addrs_(std::move(addrs)) + , sendHeartBeat_(sendHeartBeat) { + CHECK(ioThreadPool_ != nullptr && !addrs_.empty()); clientsMan_ = std::make_shared>(); updateHost(); @@ -36,21 +29,47 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool } MetaClient::~MetaClient() { - loadDataThread_.stop(); - loadDataThread_.wait(); + bgThread_.stop(); + bgThread_.wait(); VLOG(3) << "~MetaClient"; } void MetaClient::init() { loadDataThreadFunc(); - CHECK(loadDataThread_.start()); - size_t delayMS = FLAGS_load_data_interval_second * 1000 + folly::Random::rand32(900); - loadDataThread_.addTimerTask(delayMS, - FLAGS_load_data_interval_second * 1000, - &MetaClient::loadDataThreadFunc, this); + CHECK(bgThread_.start()); + { + size_t delayMS = FLAGS_load_data_interval_secs * 1000 + folly::Random::rand32(900); + LOG(INFO) << "Register timer task for load data!"; + bgThread_.addTimerTask(delayMS, + FLAGS_load_data_interval_secs * 1000, + &MetaClient::loadDataThreadFunc, this); + } + if (sendHeartBeat_) { + LOG(INFO) << "Register time task for heartbeat!"; + size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900); + bgThread_.addTimerTask(delayMS, + FLAGS_heartbeat_interval_secs * 1000, + &MetaClient::heartBeatThreadFunc, this); + } +} + +void MetaClient::heartBeatThreadFunc() { + if (listener_ == nullptr) { + VLOG(1) << "Can't send heartbeat due to listener_ is nullptr!"; + return; + } + auto ret = heartbeat().get(); + if (!ret.ok()) { + LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); + return; + } } void MetaClient::loadDataThreadFunc() { + if (ioThreadPool_->numThreads() <= 0) { + LOG(ERROR) << "The threads number in ioThreadPool should be greater than 0"; + return; + } auto ret = listSpaces().get(); if (!ret.ok()) { LOG(ERROR) << "List space failed, status:" << ret.status(); @@ -236,7 +255,7 @@ std::vector MetaClient::to(const std::vector& std::vector MetaClient::toSpaceIdName(const std::vector& tIdNames) { std::vector idNames; idNames.resize(tIdNames.size()); - std::transform(tIdNames.begin(), tIdNames.end(), idNames.begin(), [] (const auto& tin) { + std::transform(tIdNames.begin(), tIdNames.end(), idNames.begin(), [](const auto& tin) { return SpaceIdName(tin.id.get_space_id(), tin.name); }); return idNames; @@ -826,5 +845,20 @@ SchemaVer MetaClient::getNewestEdgeVerFromCache(const GraphSpaceID& space, return it->second; } +folly::Future> MetaClient::heartbeat() { + CHECK_NOTNULL(listener_); + auto localHost = listener_->getLocalHost(); + cpp2::HBReq req; + nebula::cpp2::HostAddr thriftHost; + thriftHost.set_ip(localHost.first); + thriftHost.set_port(localHost.second); + req.set_host(std::move(thriftHost)); + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_heartBeat(request); + }, [] (cpp2::HBResp&& resp) -> bool { + return resp.code == cpp2::ErrorCode::SUCCEEDED; + }, true); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index ce7c36e6d8d..e3f20bd2ea7 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -59,8 +59,9 @@ class MetaChangedListener { class MetaClient { public: - explicit MetaClient(std::shared_ptr ioThreadPool = nullptr, - std::vector addrs = {}); + explicit MetaClient(std::shared_ptr ioThreadPool, + std::vector addrs, + bool sendHeartBeat = false); virtual ~MetaClient(); @@ -184,6 +185,8 @@ class MetaClient { protected: void loadDataThreadFunc(); + void heartBeatThreadFunc(); + bool loadSchemas(GraphSpaceID spaceId, std::shared_ptr spaceInfoCache, SpaceTagNameIdMap &tagNameIdMap, @@ -191,6 +194,8 @@ class MetaClient { SpaceNewestTagVerMap &newestTagVerMap, SpaceNewestEdgeVerMap &newestEdgeVerMap); + folly::Future> heartbeat(); + std::unordered_map> reverse(const PartsAlloc& parts); void updateHost() { @@ -230,20 +235,21 @@ class MetaClient { private: std::shared_ptr ioThreadPool_; std::shared_ptr> clientsMan_; + std::unordered_map> localCache_; std::vector addrs_; // The lock used to protect active_ and leader_. folly::RWSpinLock hostLock_; HostAddr active_; HostAddr leader_; - thread::GenericWorker loadDataThread_; - std::unordered_map> localCache_; + thread::GenericWorker bgThread_; SpaceNameIdMap spaceIndexByName_; SpaceTagNameIdMap spaceTagIndexByName_; SpaceEdgeNameTypeMap spaceEdgeIndexByName_; SpaceNewestTagVerMap spaceNewestTagVerMap_; SpaceNewestEdgeVerMap spaceNewestEdgeVerMap_; - folly::RWSpinLock localCacheLock_; - MetaChangedListener* listener_{nullptr}; + folly::RWSpinLock localCacheLock_; + MetaChangedListener* listener_{nullptr}; + bool sendHeartBeat_ = false; }; } // namespace meta } // namespace nebula diff --git a/src/meta/processors/HBProcessor.h b/src/meta/processors/HBProcessor.h index a48c67c9e40..ab31834ad48 100644 --- a/src/meta/processors/HBProcessor.h +++ b/src/meta/processors/HBProcessor.h @@ -20,6 +20,7 @@ namespace meta { class HBProcessor : public BaseProcessor { FRIEND_TEST(HBProcessorTest, HBTest); + FRIEND_TEST(MetaClientTest, HeartbeatTest); public: static HBProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index f5f2c76dd6a..dfcc0b6381f 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -14,8 +14,10 @@ #include "meta/MetaServiceUtils.h" #include "meta/ServerBasedSchemaManager.h" #include "dataman/ResultSchemaProvider.h" +#include "meta/processors/HBProcessor.h" -DECLARE_int32(load_data_interval_second); +DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace meta { @@ -25,7 +27,7 @@ using nebula::cpp2::ValueType; using apache::thrift::FragileConstructor::FRAGILE; TEST(MetaClientTest, InterfacesTest) { - FLAGS_load_data_interval_second = 1; + FLAGS_load_data_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); auto sc = TestUtils::mockServer(10001, rootPath.path()); @@ -106,7 +108,7 @@ TEST(MetaClientTest, InterfacesTest) { ASSERT_EQ(ret1.value().begin()->schema.columns.size(), 5); // getTagSchemaFromCache - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); auto ver = client->getNewestTagVerFromCache(spaceId, ret1.value().begin()->tag_id); auto ret2 = client->getTagSchemaFromCache(spaceId, @@ -156,7 +158,7 @@ TEST(MetaClientTest, InterfacesTest) { ASSERT_STREQ("edgeItem0", outSchema1->getFieldName(0)); } } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); { // Test cache interfaces // For Host(0, 0) the parts should be 2, 3, 4, 6, 7, 8 @@ -263,7 +265,7 @@ TEST(MetaClientTest, InterfacesTest) { } TEST(MetaClientTest, TagTest) { - FLAGS_load_data_interval_second = 1; + FLAGS_load_data_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTagTest.XXXXXX"); auto sc = TestUtils::mockServer(10001, rootPath.path()); @@ -356,7 +358,7 @@ class TestListener : public MetaChangedListener { }; TEST(MetaClientTest, DiffTest) { - FLAGS_load_data_interval_second = 1; + FLAGS_load_data_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); auto sc = TestUtils::mockServer(10001, rootPath.path()); @@ -382,14 +384,14 @@ TEST(MetaClientTest, DiffTest) { auto ret = client->createSpace("default_space", 9, 1).get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); ASSERT_EQ(1, listener->spaceNum); ASSERT_EQ(9, listener->partNum); { auto ret = client->createSpace("default_space_1", 5, 1).get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); ASSERT_EQ(2, listener->spaceNum); ASSERT_EQ(14, listener->partNum); { @@ -397,11 +399,39 @@ TEST(MetaClientTest, DiffTest) { auto ret = client->dropSpace("default_space_1").get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_second + 1); + sleep(FLAGS_load_data_interval_secs + 1); ASSERT_EQ(1, listener->spaceNum); ASSERT_EQ(9, listener->partNum); } +TEST(MetaClientTest, HeartbeatTest) { + FLAGS_load_data_interval_secs = 5; + FLAGS_heartbeat_interval_secs = 1; + fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); + auto sc = TestUtils::mockServer(10001, rootPath.path()); + + auto threadPool = std::make_shared(1); + uint32_t localIp; + network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); + auto listener = std::make_unique(); + auto client = std::make_shared(threadPool, + std::vector{HostAddr(localIp, 10001)}, + true); // send heartbeat + client->registerListener(listener.get()); + client->init(); + { + // Test addHost, listHosts interface. + std::vector hosts = {{0, 0}}; + auto r = client->addHosts(hosts).get(); + ASSERT_TRUE(r.ok()); + auto ret = client->listHosts().get(); + ASSERT_TRUE(ret.ok()); + ASSERT_EQ(hosts, ret.value()); + } + sleep(FLAGS_heartbeat_interval_secs + 1); + ASSERT_EQ(1, HBProcessor::hostsMan()->getActiveHosts().size()); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/test/MetaHttpHandlerTest.cpp b/src/meta/test/MetaHttpHandlerTest.cpp index 2967be6670b..747278087a6 100644 --- a/src/meta/test/MetaHttpHandlerTest.cpp +++ b/src/meta/test/MetaHttpHandlerTest.cpp @@ -13,7 +13,7 @@ #include "meta/test/TestUtils.h" #include "fs/TempDir.h" -DECLARE_int32(load_data_interval_second); +DECLARE_int32(load_data_interval_secs); DECLARE_string(pid_file); namespace nebula { @@ -42,7 +42,7 @@ class MetaHttpHandlerTestEnv : public ::testing::Environment { TEST(MetaHttpHandlerTest, MetaStatusTest) { - FLAGS_load_data_interval_second = 1; + FLAGS_load_data_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); auto sc = TestUtils::mockServer(10001, rootPath.path()); diff --git a/src/storage/client/StorageClient.cpp b/src/storage/client/StorageClient.cpp index 60ea917dcd2..6e2f6ce2f17 100644 --- a/src/storage/client/StorageClient.cpp +++ b/src/storage/client/StorageClient.cpp @@ -15,16 +15,9 @@ namespace storage { StorageClient::StorageClient(std::shared_ptr threadPool, meta::MetaClient *client) - : ioThreadPool_(threadPool) { - if (nullptr == client) { - LOG(INFO) << "MetaClient is nullptr, create new one"; - static auto clientPtr = std::make_unique(); - static std::once_flag flag; - std::call_once(flag, std::bind(&meta::MetaClient::init, clientPtr.get())); - client_ = clientPtr.get(); - } else { - client_ = client; - } + : ioThreadPool_(threadPool) + , client_(client) { + CHECK_NOTNULL(client_); clientsMan_ = std::make_unique>(); } diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 322fe870c2c..0dd4bed2f2a 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -15,39 +15,44 @@ #include "dataman/RowSetReader.h" DECLARE_string(meta_server_addrs); -DECLARE_int32(load_data_interval_second); +DECLARE_int32(load_data_interval_secs); namespace nebula { namespace storage { TEST(StorageClientTest, VerticesInterfacesTest) { - FLAGS_load_data_interval_second = 1; + FLAGS_load_data_interval_secs = 1; fs::TempDir rootPath("/tmp/StorageClientTest.XXXXXX"); GraphSpaceID spaceId = 0; uint32_t localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); uint32_t localMetaPort = 10001; uint32_t localDataPort = 20002; - FLAGS_meta_server_addrs = folly::stringPrintf("127.0.0.1:%d", localMetaPort); LOG(INFO) << "Start meta server...."; std::string metaPath = folly::stringPrintf("%s/meta", rootPath.path()); auto metaServerContext = meta::TestUtils::mockServer(10001, metaPath.c_str()); + LOG(INFO) << "Create meta client..."; + auto threadPool = std::make_shared(1); + auto addrsRet + = network::NetworkUtils::toHosts(folly::stringPrintf("127.0.0.1:%d", localMetaPort)); + CHECK(addrsRet.ok()) << addrsRet.status(); + auto mClient + = std::make_unique(threadPool, std::move(addrsRet.value()), true); + mClient->init(); + LOG(INFO) << "Start data server...."; std::string dataPath = folly::stringPrintf("%s/data", rootPath.path()); - auto sc = TestUtils::mockServer(dataPath.c_str(), localIp, localDataPort); + auto sc = TestUtils::mockServer(mClient.get(), dataPath.c_str(), localIp, localDataPort); LOG(INFO) << "Add hosts and create space...."; - auto mClient = std::make_unique(); - mClient->init(); auto r = mClient->addHosts({HostAddr(localIp, localDataPort)}).get(); ASSERT_TRUE(r.ok()); auto ret = mClient->createSpace("default", 10, 1).get(); spaceId = ret.value(); - sleep(2 * FLAGS_load_data_interval_second + 1); + sleep(2 * FLAGS_load_data_interval_secs + 1); - auto threadPool = std::make_shared(1); auto client = std::make_unique(threadPool, mClient.get()); // VerticesInterfacesTest(addVertices and getVertexProps) { diff --git a/src/storage/test/StorageHttpHandlerTest.cpp b/src/storage/test/StorageHttpHandlerTest.cpp index 81f34ef724c..3d7e5750b35 100644 --- a/src/storage/test/StorageHttpHandlerTest.cpp +++ b/src/storage/test/StorageHttpHandlerTest.cpp @@ -15,7 +15,7 @@ #include "fs/TempDir.h" DECLARE_string(meta_server_addrs); -DECLARE_int32(load_data_interval_second); +DECLARE_int32(load_data_interval_secs); namespace nebula { @@ -43,22 +43,29 @@ class StorageHttpHandlerTestEnv : public ::testing::Environment { TEST(StoragehHttpHandlerTest, StorageStatusTest) { - FLAGS_load_data_interval_second = 1; + FLAGS_load_data_interval_secs = 1; fs::TempDir rootPath("/tmp/StorageClientTest.XXXXXX"); uint32_t localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); uint32_t localMetaPort = 10001; uint32_t localDataPort = 20002; - FLAGS_meta_server_addrs = folly::stringPrintf("127.0.0.1:%d", localMetaPort); LOG(INFO) << "Start meta server...."; std::string metaPath = folly::stringPrintf("%s/meta", rootPath.path()); auto metaServerContext = meta::TestUtils::mockServer(localMetaPort, metaPath.c_str()); LOG(INFO) << "Start storage server...."; + auto threadPool = std::make_shared(1); + auto addrsRet + = network::NetworkUtils::toHosts(folly::stringPrintf("127.0.0.1:%d", localMetaPort)); + CHECK(addrsRet.ok()) << addrsRet.status(); + auto mClient + = std::make_unique(threadPool, std::move(addrsRet.value()), true); + mClient->init(); std::string dataPath = folly::stringPrintf("%s/data", rootPath.path()); - auto sc = TestUtils::mockServer(dataPath.c_str(), + auto sc = TestUtils::mockServer(mClient.get(), + dataPath.c_str(), localIp, localDataPort); diff --git a/src/storage/test/TestUtils.h b/src/storage/test/TestUtils.h index 44a5cf6589d..c9ad7d64ccc 100644 --- a/src/storage/test/TestUtils.h +++ b/src/storage/test/TestUtils.h @@ -165,7 +165,8 @@ class TestUtils { uint32_t port_; }; - static std::unique_ptr mockServer(const char* dataPath, + static std::unique_ptr mockServer(meta::MetaClient* mClient, + const char* dataPath, uint32_t ip, uint32_t port = 0) { auto sc = std::make_unique(); @@ -178,7 +179,7 @@ class TestUtils { options.local_ = HostAddr(ip, port); options.dataPaths_ = std::move(paths); options.partMan_ - = std::make_unique(options.local_); + = std::make_unique(options.local_, mClient); kvstore::NebulaStore* kvPtr = static_cast( kvstore::KVStore::instance(std::move(options))); std::unique_ptr kv(kvPtr);