From 8ce57b977d62baeae3a4424d238eb44c769bf651 Mon Sep 17 00:00:00 2001 From: heroicNeZha <25311962+heroicNeZha@users.noreply.github.com> Date: Thu, 23 Dec 2021 10:42:33 +0800 Subject: [PATCH 1/4] simplify graph signal handler --- src/daemons/GraphDaemon.cpp | 57 ++++------------ src/graph/service/CMakeLists.txt | 1 + src/graph/service/GraphServer.cpp | 104 ++++++++++++++++++++++++++++++ src/graph/service/GraphServer.h | 45 +++++++++++++ 4 files changed, 161 insertions(+), 46 deletions(-) create mode 100644 src/graph/service/GraphServer.cpp create mode 100644 src/graph/service/GraphServer.h diff --git a/src/daemons/GraphDaemon.cpp b/src/daemons/GraphDaemon.cpp index 78cfdd36370..303df92016d 100644 --- a/src/daemons/GraphDaemon.cpp +++ b/src/daemons/GraphDaemon.cpp @@ -10,13 +10,13 @@ #include #include "common/base/Base.h" -#include "common/base/SignalHandler.h" #include "common/fs/FileUtils.h" #include "common/network/NetworkUtils.h" #include "common/process/ProcessUtils.h" #include "common/ssl/SSLConfig.h" #include "common/time/TimezoneInfo.h" #include "graph/service/GraphFlags.h" +#include "graph/service/GraphServer.h" #include "graph/service/GraphService.h" #include "graph/stats/StatsDef.h" #include "version/Version.h" @@ -29,13 +29,10 @@ using nebula::fs::FileUtils; using nebula::graph::GraphService; using nebula::network::NetworkUtils; -static std::unique_ptr gServer; - static void signalHandler(int sig); static Status setupSignalHandler(); extern Status setupLogging(); static void printHelp(const char *prog); -static void setupThreadManager(); #if defined(__x86_64__) extern Status setupBreakpad(); #endif @@ -43,6 +40,8 @@ extern Status setupBreakpad(); DECLARE_string(flagfile); DECLARE_bool(containerized); +std::unique_ptr gServer; + int main(int argc, char *argv[]) { google::SetVersionString(nebula::versionString()); if (argc == 1) { @@ -153,30 +152,6 @@ int main(int argc, char *argv[]) { } LOG(INFO) << "Number of worker threads: " << FLAGS_num_worker_threads; - auto threadFactory = std::make_shared("graph-netio"); - auto ioThreadPool = std::make_shared(FLAGS_num_netio_threads, - std::move(threadFactory)); - gServer = std::make_unique(); - gServer->setIOThreadPool(ioThreadPool); - - auto interface = std::make_shared(); - status = interface->init(ioThreadPool, localhost); - if (!status.ok()) { - LOG(ERROR) << status; - return EXIT_FAILURE; - } - - gServer->setPort(localhost.port); - gServer->setInterface(std::move(interface)); - gServer->setReusePort(FLAGS_reuse_port); - gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs)); - gServer->setNumAcceptThreads(FLAGS_num_accept_threads); - gServer->setListenBacklog(FLAGS_listen_backlog); - if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) { - gServer->setSSLConfig(nebula::sslContextConfig()); - } - setupThreadManager(); - // Setup the signal handlers status = setupSignalHandler(); if (!status.ok()) { @@ -184,16 +159,16 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } - FLOG_INFO("Starting nebula-graphd on %s:%d\n", localhost.host.c_str(), localhost.port); - try { - gServer->serve(); // Blocking wait until shut down via gServer->stop() - } catch (const std::exception &e) { - FLOG_ERROR("Exception thrown while starting the RPC server: %s", e.what()); + gServer = std::make_unique(localhost); + + if (!gServer->start()) { + LOG(ERROR) << "The graph server start failed"; + gServer->stop(); return EXIT_FAILURE; } - FLOG_INFO("nebula-graphd on %s:%d has been stopped", localhost.host.c_str(), localhost.port); - + gServer->waitUntilStop(); + LOG(INFO) << "The graph Daemon stopped"; return EXIT_SUCCESS; } @@ -208,7 +183,7 @@ void signalHandler(int sig) { case SIGINT: case SIGTERM: FLOG_INFO("Signal %d(%s) received, stopping this server", sig, ::strsignal(sig)); - gServer->stop(); + gServer->notifyStop(); break; default: FLOG_ERROR("Signal %d(%s) received but ignored", sig, ::strsignal(sig)); @@ -216,13 +191,3 @@ void signalHandler(int sig) { } void printHelp(const char *prog) { fprintf(stderr, "%s --flagfile \n", prog); } - -void setupThreadManager() { - int numThreads = - FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads : gServer->getNumIOWorkerThreads(); - std::shared_ptr threadManager( - PriorityThreadManager::newPriorityThreadManager(numThreads, false /*stats*/)); - threadManager->setNamePrefix("executor"); - threadManager->start(); - gServer->setThreadManager(threadManager); -} diff --git a/src/graph/service/CMakeLists.txt b/src/graph/service/CMakeLists.txt index c63f1535069..2ff0c6406dc 100644 --- a/src/graph/service/CMakeLists.txt +++ b/src/graph/service/CMakeLists.txt @@ -10,6 +10,7 @@ nebula_add_library( nebula_add_library( service_obj OBJECT GraphService.cpp + GraphServer.cpp ) nebula_add_library( diff --git a/src/graph/service/GraphServer.cpp b/src/graph/service/GraphServer.cpp new file mode 100644 index 00000000000..3d7b68f4f83 --- /dev/null +++ b/src/graph/service/GraphServer.cpp @@ -0,0 +1,104 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ +#include "GraphServer.h" + +#include + +#include "graph/service/GraphFlags.h" +#include "graph/service/GraphService.h" +namespace nebula { +namespace graph { + +GraphServer::GraphServer(HostAddr localHost) : localHost_(localHost) {} + +GraphServer::~GraphServer() { stop(); } + +bool GraphServer::start() { + auto threadFactory = std::make_shared("graph-netio"); + auto ioThreadPool = std::make_shared(FLAGS_num_netio_threads, + std::move(threadFactory)); + int numThreads = + FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads : gServer_->getNumIOWorkerThreads(); + std::shared_ptr threadManager( + PriorityThreadManager::newPriorityThreadManager(numThreads, false /*stats*/)); + threadManager->setNamePrefix("executor"); + threadManager->start(); + + gServer_ = std::make_unique(); + gServer_->setIOThreadPool(ioThreadPool); + + auto interface = std::make_shared(); + auto status = interface->init(ioThreadPool, localHost_); + if (!status.ok()) { + LOG(ERROR) << status; + return false; + } + + graphThread_ = std::make_unique([&] { + gServer_->setPort(localHost_.port); + gServer_->setInterface(std::move(interface)); + gServer_->setReusePort(FLAGS_reuse_port); + gServer_->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs)); + gServer_->setNumAcceptThreads(FLAGS_num_accept_threads); + gServer_->setListenBacklog(FLAGS_listen_backlog); + if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) { + gServer_->setSSLConfig(nebula::sslContextConfig()); + } + gServer_->setThreadManager(threadManager); + + serverStatus_.store(STATUS_RUNNING); + FLOG_INFO("Starting nebula-graphd on %s:%d\n", localHost_.host.c_str(), localHost_.port); + try { + gServer_->serve(); // Blocking wait until shut down via gServer_->stop() + } catch (const std::exception &e) { + FLOG_ERROR("Exception thrown while starting the graph RPC server: %s", e.what()); + } + serverStatus_.store(STATUS_STOPPED); + FLOG_INFO("nebula-graphd on %s:%d has been stopped", localHost_.host.c_str(), localHost_.port); + }); + + while (serverStatus_ == STATUS_UNINITIALIZED) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + return true; +} + +void GraphServer::waitUntilStop() { + { + std::unique_lock lkStop(muStop_); + while (serverStatus_ == STATUS_RUNNING) { + cvStop_.wait(lkStop); + } + } + + gServer_->stop(); + + graphThread_->join(); +} + +void GraphServer::notifyStop() { + std::unique_lock lkStop(muStop_); + if (serverStatus_ == STATUS_RUNNING) { + serverStatus_ = STATUS_STOPPED; + cvStop_.notify_one(); + } +} + +void GraphServer::stop() { + if (serverStatus_.load() == ServiceStatus::STATUS_STOPPED) { + LOG(INFO) << "The graph server has been stopped"; + return; + } + + ServiceStatus serverExpected = ServiceStatus::STATUS_RUNNING; + serverStatus_.compare_exchange_strong(serverExpected, STATUS_STOPPED); + + if (gServer_) { + gServer_->stop(); + } +} + +} // namespace graph +} // namespace nebula diff --git a/src/graph/service/GraphServer.h b/src/graph/service/GraphServer.h new file mode 100644 index 00000000000..cf451638b53 --- /dev/null +++ b/src/graph/service/GraphServer.h @@ -0,0 +1,45 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ +#include + +#include +#include + +#include "common/base/Base.h" +#include "common/base/SignalHandler.h" +#include "common/network/NetworkUtils.h" +namespace nebula { +namespace graph { +class GraphServer { + public: + explicit GraphServer(HostAddr localHost); + + ~GraphServer(); + + // Return false if failed. + bool start(); + + void stop(); + + // used for signal handler to set an internal stop flag + void notifyStop(); + + void waitUntilStop(); + + private: + HostAddr localHost_; + + std::shared_ptr ioThreadPool_; + std::shared_ptr workers_; + std::unique_ptr gServer_; + std::unique_ptr graphThread_; + + enum ServiceStatus { STATUS_UNINITIALIZED = 0, STATUS_RUNNING = 1, STATUS_STOPPED = 2 }; + std::atomic serverStatus_{STATUS_UNINITIALIZED}; + std::mutex muStop_; + std::condition_variable cvStop_; +}; +} // namespace graph +} // namespace nebula From 85896a638637f950df098f4520c36e1dbcd5f4fc Mon Sep 17 00:00:00 2001 From: heroicNeZha <25311962+heroicNeZha@users.noreply.github.com> Date: Mon, 27 Dec 2021 16:36:24 +0800 Subject: [PATCH 2/4] style --- src/graph/service/GraphServer.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/graph/service/GraphServer.cpp b/src/graph/service/GraphServer.cpp index 3d7b68f4f83..8a6b8d18a89 100644 --- a/src/graph/service/GraphServer.cpp +++ b/src/graph/service/GraphServer.cpp @@ -13,7 +13,9 @@ namespace graph { GraphServer::GraphServer(HostAddr localHost) : localHost_(localHost) {} -GraphServer::~GraphServer() { stop(); } +GraphServer::~GraphServer() { + stop(); +} bool GraphServer::start() { auto threadFactory = std::make_shared("graph-netio"); From c77e135b02bbbfcaff2562ea504d232250ee604a Mon Sep 17 00:00:00 2001 From: heroicNeZha <25311962+heroicNeZha@users.noreply.github.com> Date: Mon, 27 Dec 2021 16:36:24 +0800 Subject: [PATCH 3/4] style --- src/graph/service/GraphServer.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/graph/service/GraphServer.cpp b/src/graph/service/GraphServer.cpp index 3d7b68f4f83..4c1ceb37e1a 100644 --- a/src/graph/service/GraphServer.cpp +++ b/src/graph/service/GraphServer.cpp @@ -13,7 +13,9 @@ namespace graph { GraphServer::GraphServer(HostAddr localHost) : localHost_(localHost) {} -GraphServer::~GraphServer() { stop(); } +GraphServer::~GraphServer() { + stop(); +} bool GraphServer::start() { auto threadFactory = std::make_shared("graph-netio"); @@ -22,7 +24,7 @@ bool GraphServer::start() { int numThreads = FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads : gServer_->getNumIOWorkerThreads(); std::shared_ptr threadManager( - PriorityThreadManager::newPriorityThreadManager(numThreads, false /*stats*/)); + PriorityThreadManager::newPriorityThreadManager(numThreads)); threadManager->setNamePrefix("executor"); threadManager->start(); From 7ba1b71dd6a1e2583fa30bb339c32a80b8c1ab6a Mon Sep 17 00:00:00 2001 From: heroicNeZha <25311962+heroicNeZha@users.noreply.github.com> Date: Tue, 28 Dec 2021 11:20:57 +0800 Subject: [PATCH 4/4] fix some comment --- src/daemons/GraphDaemon.cpp | 1 - src/graph/service/GraphServer.cpp | 39 +++++++++++++++---------------- src/graph/service/GraphServer.h | 5 ++-- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/src/daemons/GraphDaemon.cpp b/src/daemons/GraphDaemon.cpp index afea4761475..b6b25364891 100644 --- a/src/daemons/GraphDaemon.cpp +++ b/src/daemons/GraphDaemon.cpp @@ -163,7 +163,6 @@ int main(int argc, char *argv[]) { if (!gServer->start()) { LOG(ERROR) << "The graph server start failed"; - gServer->stop(); return EXIT_FAILURE; } diff --git a/src/graph/service/GraphServer.cpp b/src/graph/service/GraphServer.cpp index 4c1ceb37e1a..25e6ee6cc5e 100644 --- a/src/graph/service/GraphServer.cpp +++ b/src/graph/service/GraphServer.cpp @@ -5,13 +5,14 @@ #include "GraphServer.h" #include +#include #include "graph/service/GraphFlags.h" #include "graph/service/GraphService.h" namespace nebula { namespace graph { -GraphServer::GraphServer(HostAddr localHost) : localHost_(localHost) {} +GraphServer::GraphServer(HostAddr localHost) : localHost_(std::move(localHost)) {} GraphServer::~GraphServer() { stop(); @@ -21,15 +22,15 @@ bool GraphServer::start() { auto threadFactory = std::make_shared("graph-netio"); auto ioThreadPool = std::make_shared(FLAGS_num_netio_threads, std::move(threadFactory)); - int numThreads = - FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads : gServer_->getNumIOWorkerThreads(); + int numThreads = FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads + : thriftServer_->getNumIOWorkerThreads(); std::shared_ptr threadManager( PriorityThreadManager::newPriorityThreadManager(numThreads)); threadManager->setNamePrefix("executor"); threadManager->start(); - gServer_ = std::make_unique(); - gServer_->setIOThreadPool(ioThreadPool); + thriftServer_ = std::make_unique(); + thriftServer_->setIOThreadPool(ioThreadPool); auto interface = std::make_shared(); auto status = interface->init(ioThreadPool, localHost_); @@ -39,21 +40,21 @@ bool GraphServer::start() { } graphThread_ = std::make_unique([&] { - gServer_->setPort(localHost_.port); - gServer_->setInterface(std::move(interface)); - gServer_->setReusePort(FLAGS_reuse_port); - gServer_->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs)); - gServer_->setNumAcceptThreads(FLAGS_num_accept_threads); - gServer_->setListenBacklog(FLAGS_listen_backlog); + thriftServer_->setPort(localHost_.port); + thriftServer_->setInterface(std::move(interface)); + thriftServer_->setReusePort(FLAGS_reuse_port); + thriftServer_->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs)); + thriftServer_->setNumAcceptThreads(FLAGS_num_accept_threads); + thriftServer_->setListenBacklog(FLAGS_listen_backlog); if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) { - gServer_->setSSLConfig(nebula::sslContextConfig()); + thriftServer_->setSSLConfig(nebula::sslContextConfig()); } - gServer_->setThreadManager(threadManager); + thriftServer_->setThreadManager(threadManager); serverStatus_.store(STATUS_RUNNING); FLOG_INFO("Starting nebula-graphd on %s:%d\n", localHost_.host.c_str(), localHost_.port); try { - gServer_->serve(); // Blocking wait until shut down via gServer_->stop() + thriftServer_->serve(); // Blocking wait until shut down via thriftServer_->stop() } catch (const std::exception &e) { FLOG_ERROR("Exception thrown while starting the graph RPC server: %s", e.what()); } @@ -70,12 +71,10 @@ bool GraphServer::start() { void GraphServer::waitUntilStop() { { std::unique_lock lkStop(muStop_); - while (serverStatus_ == STATUS_RUNNING) { - cvStop_.wait(lkStop); - } + cvStop_.wait(lkStop, [&] { return serverStatus_ != STATUS_RUNNING; }); } - gServer_->stop(); + thriftServer_->stop(); graphThread_->join(); } @@ -97,8 +96,8 @@ void GraphServer::stop() { ServiceStatus serverExpected = ServiceStatus::STATUS_RUNNING; serverStatus_.compare_exchange_strong(serverExpected, STATUS_STOPPED); - if (gServer_) { - gServer_->stop(); + if (thriftServer_) { + thriftServer_->stop(); } } diff --git a/src/graph/service/GraphServer.h b/src/graph/service/GraphServer.h index cf451638b53..671c0720605 100644 --- a/src/graph/service/GraphServer.h +++ b/src/graph/service/GraphServer.h @@ -4,6 +4,7 @@ */ #include +#include #include #include @@ -33,10 +34,10 @@ class GraphServer { std::shared_ptr ioThreadPool_; std::shared_ptr workers_; - std::unique_ptr gServer_; + std::unique_ptr thriftServer_; std::unique_ptr graphThread_; - enum ServiceStatus { STATUS_UNINITIALIZED = 0, STATUS_RUNNING = 1, STATUS_STOPPED = 2 }; + enum ServiceStatus : uint8_t { STATUS_UNINITIALIZED = 0, STATUS_RUNNING = 1, STATUS_STOPPED = 2 }; std::atomic serverStatus_{STATUS_UNINITIALIZED}; std::mutex muStop_; std::condition_variable cvStop_;