Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify graph signal handler #3542

Merged
merged 14 commits into from
Dec 30, 2021
Merged
56 changes: 10 additions & 46 deletions src/daemons/GraphDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
#include <thrift/lib/cpp2/server/ThriftServer.h>

#include "common/base/Base.h"
#include "common/base/SignalHandler.h"
#include "common/fs/FileUtils.h"
#include "common/network/NetworkUtils.h"
#include "common/process/ProcessUtils.h"
#include "common/ssl/SSLConfig.h"
#include "common/time/TimezoneInfo.h"
#include "daemons/SetupLogging.h"
#include "graph/service/GraphFlags.h"
#include "graph/service/GraphServer.h"
#include "graph/service/GraphService.h"
#include "graph/stats/GraphStats.h"
#include "version/Version.h"
Expand All @@ -30,19 +30,18 @@ using nebula::fs::FileUtils;
using nebula::graph::GraphService;
using nebula::network::NetworkUtils;

static std::unique_ptr<apache::thrift::ThriftServer> gServer;

static void signalHandler(int sig);
static Status setupSignalHandler();
static void printHelp(const char *prog);
static void setupThreadManager();
#if defined(__x86_64__)
extern Status setupBreakpad();
#endif

DECLARE_string(flagfile);
DECLARE_bool(containerized);

std::unique_ptr<nebula::graph::GraphServer> gServer;
heroicNeZha marked this conversation as resolved.
Show resolved Hide resolved

int main(int argc, char *argv[]) {
google::SetVersionString(nebula::versionString());
if (argc == 1) {
Expand Down Expand Up @@ -154,47 +153,22 @@ int main(int argc, char *argv[]) {
}
LOG(INFO) << "Number of worker threads: " << FLAGS_num_worker_threads;

auto threadFactory = std::make_shared<folly::NamedThreadFactory>("graph-netio");
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_netio_threads,
std::move(threadFactory));
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->setIOThreadPool(ioThreadPool);

auto interface = std::make_shared<GraphService>();
status = interface->init(ioThreadPool, localhost);
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}

gServer->setPort(localhost.port);
gServer->setInterface(std::move(interface));
gServer->setReusePort(FLAGS_reuse_port);
gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs));
gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
gServer->setListenBacklog(FLAGS_listen_backlog);
if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) {
gServer->setSSLConfig(nebula::sslContextConfig());
}
setupThreadManager();

// Setup the signal handlers
status = setupSignalHandler();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}

FLOG_INFO("Starting nebula-graphd on %s:%d\n", localhost.host.c_str(), localhost.port);
try {
gServer->serve(); // Blocking wait until shut down via gServer->stop()
} catch (const std::exception &e) {
FLOG_ERROR("Exception thrown while starting the RPC server: %s", e.what());
gServer = std::make_unique<nebula::graph::GraphServer>(localhost);

if (!gServer->start()) {
LOG(ERROR) << "The graph server start failed";
return EXIT_FAILURE;
}

FLOG_INFO("nebula-graphd on %s:%d has been stopped", localhost.host.c_str(), localhost.port);

gServer->waitUntilStop();
LOG(INFO) << "The graph Daemon stopped";
return EXIT_SUCCESS;
}

Expand All @@ -209,7 +183,7 @@ void signalHandler(int sig) {
case SIGINT:
case SIGTERM:
FLOG_INFO("Signal %d(%s) received, stopping this server", sig, ::strsignal(sig));
gServer->stop();
gServer->notifyStop();
break;
default:
FLOG_ERROR("Signal %d(%s) received but ignored", sig, ::strsignal(sig));
Expand All @@ -219,13 +193,3 @@ void signalHandler(int sig) {
void printHelp(const char *prog) {
fprintf(stderr, "%s --flagfile <config_file>\n", prog);
}

void setupThreadManager() {
int numThreads =
FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads : gServer->getNumIOWorkerThreads();
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager(
PriorityThreadManager::newPriorityThreadManager(numThreads));
threadManager->setNamePrefix("executor");
threadManager->start();
gServer->setThreadManager(threadManager);
}
1 change: 1 addition & 0 deletions src/graph/service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ nebula_add_library(
nebula_add_library(
service_obj OBJECT
GraphService.cpp
GraphServer.cpp
)

nebula_add_library(
Expand Down
105 changes: 105 additions & 0 deletions src/graph/service/GraphServer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#include "GraphServer.h"

#include <memory>
#include <utility>

#include "graph/service/GraphFlags.h"
#include "graph/service/GraphService.h"
namespace nebula {
namespace graph {

GraphServer::GraphServer(HostAddr localHost) : localHost_(std::move(localHost)) {}

GraphServer::~GraphServer() {
stop();
}

bool GraphServer::start() {
auto threadFactory = std::make_shared<folly::NamedThreadFactory>("graph-netio");
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_netio_threads,
std::move(threadFactory));
int numThreads = FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads
: thriftServer_->getNumIOWorkerThreads();
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager(
PriorityThreadManager::newPriorityThreadManager(numThreads));
threadManager->setNamePrefix("executor");
threadManager->start();

thriftServer_ = std::make_unique<apache::thrift::ThriftServer>();
thriftServer_->setIOThreadPool(ioThreadPool);

auto interface = std::make_shared<GraphService>();
auto status = interface->init(ioThreadPool, localHost_);
if (!status.ok()) {
LOG(ERROR) << status;
return false;
}

graphThread_ = std::make_unique<std::thread>([&] {
thriftServer_->setPort(localHost_.port);
thriftServer_->setInterface(std::move(interface));
thriftServer_->setReusePort(FLAGS_reuse_port);
thriftServer_->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs));
thriftServer_->setNumAcceptThreads(FLAGS_num_accept_threads);
thriftServer_->setListenBacklog(FLAGS_listen_backlog);
if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) {
thriftServer_->setSSLConfig(nebula::sslContextConfig());
}
thriftServer_->setThreadManager(threadManager);

serverStatus_.store(STATUS_RUNNING);
FLOG_INFO("Starting nebula-graphd on %s:%d\n", localHost_.host.c_str(), localHost_.port);
try {
thriftServer_->serve(); // Blocking wait until shut down via thriftServer_->stop()
} catch (const std::exception &e) {
FLOG_ERROR("Exception thrown while starting the graph RPC server: %s", e.what());
}
serverStatus_.store(STATUS_STOPPED);
FLOG_INFO("nebula-graphd on %s:%d has been stopped", localHost_.host.c_str(), localHost_.port);
});

while (serverStatus_ == STATUS_UNINITIALIZED) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
return true;
}

void GraphServer::waitUntilStop() {
{
std::unique_lock<std::mutex> lkStop(muStop_);
cvStop_.wait(lkStop, [&] { return serverStatus_ != STATUS_RUNNING; });
}

thriftServer_->stop();

graphThread_->join();
}

void GraphServer::notifyStop() {
std::unique_lock<std::mutex> lkStop(muStop_);
heroicNeZha marked this conversation as resolved.
Show resolved Hide resolved
if (serverStatus_ == STATUS_RUNNING) {
serverStatus_ = STATUS_STOPPED;
cvStop_.notify_one();
}
}

void GraphServer::stop() {
if (serverStatus_.load() == ServiceStatus::STATUS_STOPPED) {
LOG(INFO) << "The graph server has been stopped";
return;
}

ServiceStatus serverExpected = ServiceStatus::STATUS_RUNNING;
serverStatus_.compare_exchange_strong(serverExpected, STATUS_STOPPED);
Copy link
Contributor

@Shylock-Hg Shylock-Hg Dec 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't wait it to be running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it wait in thriftServer.stop


if (thriftServer_) {
thriftServer_->stop();
}
}

} // namespace graph
} // namespace nebula
46 changes: 46 additions & 0 deletions src/graph/service/GraphServer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#include <thrift/lib/cpp2/server/ThriftServer.h>

#include <cstdint>
#include <mutex>
#include <thread>

#include "common/base/Base.h"
#include "common/base/SignalHandler.h"
#include "common/network/NetworkUtils.h"
namespace nebula {
namespace graph {
class GraphServer {
public:
explicit GraphServer(HostAddr localHost);

~GraphServer();

// Return false if failed.
bool start();

void stop();

// used for signal handler to set an internal stop flag
void notifyStop();

void waitUntilStop();

private:
HostAddr localHost_;

std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::shared_ptr<apache::thrift::concurrency::ThreadManager> workers_;
std::unique_ptr<apache::thrift::ThriftServer> thriftServer_;
std::unique_ptr<std::thread> graphThread_;

enum ServiceStatus : uint8_t { STATUS_UNINITIALIZED = 0, STATUS_RUNNING = 1, STATUS_STOPPED = 2 };
std::atomic<ServiceStatus> serverStatus_{STATUS_UNINITIALIZED};
std::mutex muStop_;
std::condition_variable cvStop_;
};
} // namespace graph
} // namespace nebula