diff --git a/src/daemons/GraphDaemon.cpp b/src/daemons/GraphDaemon.cpp index 798b9dd2e63..40fae990481 100644 --- a/src/daemons/GraphDaemon.cpp +++ b/src/daemons/GraphDaemon.cpp @@ -10,7 +10,6 @@ #include #include "common/base/Base.h" -#include "common/base/SignalHandler.h" #include "common/fs/FileUtils.h" #include "common/network/NetworkUtils.h" #include "common/process/ProcessUtils.h" @@ -18,6 +17,7 @@ #include "common/time/TimezoneInfo.h" #include "daemons/SetupLogging.h" #include "graph/service/GraphFlags.h" +#include "graph/service/GraphServer.h" #include "graph/service/GraphService.h" #include "graph/stats/GraphStats.h" #include "version/Version.h" @@ -30,12 +30,9 @@ using nebula::fs::FileUtils; using nebula::graph::GraphService; using nebula::network::NetworkUtils; -static std::unique_ptr gServer; - static void signalHandler(int sig); static Status setupSignalHandler(); static void printHelp(const char *prog); -static void setupThreadManager(); extern Status setupAuditLog(); #if defined(__x86_64__) extern Status setupBreakpad(); @@ -44,6 +41,8 @@ extern Status setupBreakpad(); DECLARE_string(flagfile); DECLARE_bool(containerized); +std::unique_ptr gServer; + int main(int argc, char *argv[]) { google::SetVersionString(nebula::versionString()); if (argc == 1) { @@ -164,30 +163,6 @@ int main(int argc, char *argv[]) { } LOG(INFO) << "Number of worker threads: " << FLAGS_num_worker_threads; - auto threadFactory = std::make_shared("graph-netio"); - auto ioThreadPool = std::make_shared(FLAGS_num_netio_threads, - std::move(threadFactory)); - gServer = std::make_unique(); - gServer->setIOThreadPool(ioThreadPool); - - auto interface = std::make_shared(); - status = interface->init(ioThreadPool, localhost); - if (!status.ok()) { - LOG(ERROR) << status; - return EXIT_FAILURE; - } - - gServer->setPort(localhost.port); - gServer->setInterface(std::move(interface)); - gServer->setReusePort(FLAGS_reuse_port); - gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs)); - gServer->setNumAcceptThreads(FLAGS_num_accept_threads); - gServer->setListenBacklog(FLAGS_listen_backlog); - if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) { - gServer->setSSLConfig(nebula::sslContextConfig()); - } - setupThreadManager(); - // Setup the signal handlers status = setupSignalHandler(); if (!status.ok()) { @@ -195,16 +170,15 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } - FLOG_INFO("Starting nebula-graphd on %s:%d\n", localhost.host.c_str(), localhost.port); - try { - gServer->serve(); // Blocking wait until shut down via gServer->stop() - } catch (const std::exception &e) { - FLOG_ERROR("Exception thrown while starting the RPC server: %s", e.what()); + gServer = std::make_unique(localhost); + + if (!gServer->start()) { + LOG(ERROR) << "The graph server start failed"; return EXIT_FAILURE; } - FLOG_INFO("nebula-graphd on %s:%d has been stopped", localhost.host.c_str(), localhost.port); - + gServer->waitUntilStop(); + LOG(INFO) << "The graph Daemon stopped"; return EXIT_SUCCESS; } @@ -219,7 +193,7 @@ void signalHandler(int sig) { case SIGINT: case SIGTERM: FLOG_INFO("Signal %d(%s) received, stopping this server", sig, ::strsignal(sig)); - gServer->stop(); + gServer->notifyStop(); break; default: FLOG_ERROR("Signal %d(%s) received but ignored", sig, ::strsignal(sig)); @@ -229,13 +203,3 @@ void signalHandler(int sig) { void printHelp(const char *prog) { fprintf(stderr, "%s --flagfile \n", prog); } - -void setupThreadManager() { - int numThreads = - FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads : gServer->getNumIOWorkerThreads(); - std::shared_ptr threadManager( - PriorityThreadManager::newPriorityThreadManager(numThreads)); - threadManager->setNamePrefix("executor"); - threadManager->start(); - gServer->setThreadManager(threadManager); -} diff --git a/src/graph/service/CMakeLists.txt b/src/graph/service/CMakeLists.txt index 51221d6fbee..247ebb95e55 100644 --- a/src/graph/service/CMakeLists.txt +++ b/src/graph/service/CMakeLists.txt @@ -10,6 +10,7 @@ nebula_add_library( nebula_add_library( service_obj OBJECT GraphService.cpp + GraphServer.cpp ) nebula_add_library( diff --git a/src/graph/service/GraphServer.cpp b/src/graph/service/GraphServer.cpp new file mode 100644 index 00000000000..25e6ee6cc5e --- /dev/null +++ b/src/graph/service/GraphServer.cpp @@ -0,0 +1,105 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ +#include "GraphServer.h" + +#include +#include + +#include "graph/service/GraphFlags.h" +#include "graph/service/GraphService.h" +namespace nebula { +namespace graph { + +GraphServer::GraphServer(HostAddr localHost) : localHost_(std::move(localHost)) {} + +GraphServer::~GraphServer() { + stop(); +} + +bool GraphServer::start() { + auto threadFactory = std::make_shared("graph-netio"); + auto ioThreadPool = std::make_shared(FLAGS_num_netio_threads, + std::move(threadFactory)); + int numThreads = FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads + : thriftServer_->getNumIOWorkerThreads(); + std::shared_ptr threadManager( + PriorityThreadManager::newPriorityThreadManager(numThreads)); + threadManager->setNamePrefix("executor"); + threadManager->start(); + + thriftServer_ = std::make_unique(); + thriftServer_->setIOThreadPool(ioThreadPool); + + auto interface = std::make_shared(); + auto status = interface->init(ioThreadPool, localHost_); + if (!status.ok()) { + LOG(ERROR) << status; + return false; + } + + graphThread_ = std::make_unique([&] { + thriftServer_->setPort(localHost_.port); + thriftServer_->setInterface(std::move(interface)); + thriftServer_->setReusePort(FLAGS_reuse_port); + thriftServer_->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs)); + thriftServer_->setNumAcceptThreads(FLAGS_num_accept_threads); + thriftServer_->setListenBacklog(FLAGS_listen_backlog); + if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) { + thriftServer_->setSSLConfig(nebula::sslContextConfig()); + } + thriftServer_->setThreadManager(threadManager); + + serverStatus_.store(STATUS_RUNNING); + FLOG_INFO("Starting nebula-graphd on %s:%d\n", localHost_.host.c_str(), localHost_.port); + try { + thriftServer_->serve(); // Blocking wait until shut down via thriftServer_->stop() + } catch (const std::exception &e) { + FLOG_ERROR("Exception thrown while starting the graph RPC server: %s", e.what()); + } + serverStatus_.store(STATUS_STOPPED); + FLOG_INFO("nebula-graphd on %s:%d has been stopped", localHost_.host.c_str(), localHost_.port); + }); + + while (serverStatus_ == STATUS_UNINITIALIZED) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + return true; +} + +void GraphServer::waitUntilStop() { + { + std::unique_lock lkStop(muStop_); + cvStop_.wait(lkStop, [&] { return serverStatus_ != STATUS_RUNNING; }); + } + + thriftServer_->stop(); + + graphThread_->join(); +} + +void GraphServer::notifyStop() { + std::unique_lock lkStop(muStop_); + if (serverStatus_ == STATUS_RUNNING) { + serverStatus_ = STATUS_STOPPED; + cvStop_.notify_one(); + } +} + +void GraphServer::stop() { + if (serverStatus_.load() == ServiceStatus::STATUS_STOPPED) { + LOG(INFO) << "The graph server has been stopped"; + return; + } + + ServiceStatus serverExpected = ServiceStatus::STATUS_RUNNING; + serverStatus_.compare_exchange_strong(serverExpected, STATUS_STOPPED); + + if (thriftServer_) { + thriftServer_->stop(); + } +} + +} // namespace graph +} // namespace nebula diff --git a/src/graph/service/GraphServer.h b/src/graph/service/GraphServer.h new file mode 100644 index 00000000000..671c0720605 --- /dev/null +++ b/src/graph/service/GraphServer.h @@ -0,0 +1,46 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ +#include + +#include +#include +#include + +#include "common/base/Base.h" +#include "common/base/SignalHandler.h" +#include "common/network/NetworkUtils.h" +namespace nebula { +namespace graph { +class GraphServer { + public: + explicit GraphServer(HostAddr localHost); + + ~GraphServer(); + + // Return false if failed. + bool start(); + + void stop(); + + // used for signal handler to set an internal stop flag + void notifyStop(); + + void waitUntilStop(); + + private: + HostAddr localHost_; + + std::shared_ptr ioThreadPool_; + std::shared_ptr workers_; + std::unique_ptr thriftServer_; + std::unique_ptr graphThread_; + + enum ServiceStatus : uint8_t { STATUS_UNINITIALIZED = 0, STATUS_RUNNING = 1, STATUS_STOPPED = 2 }; + std::atomic serverStatus_{STATUS_UNINITIALIZED}; + std::mutex muStop_; + std::condition_variable cvStop_; +}; +} // namespace graph +} // namespace nebula