forked from vesoft-inc/nebula
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
simplify graph signal handler (vesoft-inc#449)
#### What type of PR is this? - [x] bug - [ ] feature - [ ] enhancement #### What does this PR do? closes vesoft-inc#3441 simplify graphd signal handler:Set stop condition instead of waiting all workers stop #### Which issue(s)/PR(s) this PR relates to? In vesoft-inc#3437 , it simplify storage SIGINT signal handler #### Special notes for your reviewer, ex. impact of this fix, etc: #### Additional context/ Design document: #### Checklist: - [ ] Documentation affected (Please add the label if documentation needs to be modified.) - [ ] Incompatibility (If it breaks the compatibility, please describe it and add the corresponding label.) - [ ] If it's needed to cherry-pick (If cherry-pick to some branches is required, please label the destination version(s).) - [ ] Performance impacted: Consumes more CPU/Memory #### Release notes: Please confirm whether to be reflected in release notes and how to describe: > ` Migrated from vesoft-inc#3542 Co-authored-by: endy.li <[email protected]>
- Loading branch information
1 parent
57cf00b
commit b1bea3d
Showing
4 changed files
with
162 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* Copyright (c) 2021 vesoft inc. All rights reserved. | ||
* | ||
* This source code is licensed under Apache 2.0 License. | ||
*/ | ||
#include "GraphServer.h" | ||
|
||
#include <memory> | ||
#include <utility> | ||
|
||
#include "graph/service/GraphFlags.h" | ||
#include "graph/service/GraphService.h" | ||
namespace nebula { | ||
namespace graph { | ||
|
||
GraphServer::GraphServer(HostAddr localHost) : localHost_(std::move(localHost)) {} | ||
|
||
GraphServer::~GraphServer() { | ||
stop(); | ||
} | ||
|
||
bool GraphServer::start() { | ||
auto threadFactory = std::make_shared<folly::NamedThreadFactory>("graph-netio"); | ||
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_netio_threads, | ||
std::move(threadFactory)); | ||
int numThreads = FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads | ||
: thriftServer_->getNumIOWorkerThreads(); | ||
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager( | ||
PriorityThreadManager::newPriorityThreadManager(numThreads)); | ||
threadManager->setNamePrefix("executor"); | ||
threadManager->start(); | ||
|
||
thriftServer_ = std::make_unique<apache::thrift::ThriftServer>(); | ||
thriftServer_->setIOThreadPool(ioThreadPool); | ||
|
||
auto interface = std::make_shared<GraphService>(); | ||
auto status = interface->init(ioThreadPool, localHost_); | ||
if (!status.ok()) { | ||
LOG(ERROR) << status; | ||
return false; | ||
} | ||
|
||
graphThread_ = std::make_unique<std::thread>([&] { | ||
thriftServer_->setPort(localHost_.port); | ||
thriftServer_->setInterface(std::move(interface)); | ||
thriftServer_->setReusePort(FLAGS_reuse_port); | ||
thriftServer_->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs)); | ||
thriftServer_->setNumAcceptThreads(FLAGS_num_accept_threads); | ||
thriftServer_->setListenBacklog(FLAGS_listen_backlog); | ||
if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) { | ||
thriftServer_->setSSLConfig(nebula::sslContextConfig()); | ||
} | ||
thriftServer_->setThreadManager(threadManager); | ||
|
||
serverStatus_.store(STATUS_RUNNING); | ||
FLOG_INFO("Starting nebula-graphd on %s:%d\n", localHost_.host.c_str(), localHost_.port); | ||
try { | ||
thriftServer_->serve(); // Blocking wait until shut down via thriftServer_->stop() | ||
} catch (const std::exception &e) { | ||
FLOG_ERROR("Exception thrown while starting the graph RPC server: %s", e.what()); | ||
} | ||
serverStatus_.store(STATUS_STOPPED); | ||
FLOG_INFO("nebula-graphd on %s:%d has been stopped", localHost_.host.c_str(), localHost_.port); | ||
}); | ||
|
||
while (serverStatus_ == STATUS_UNINITIALIZED) { | ||
std::this_thread::sleep_for(std::chrono::microseconds(100)); | ||
} | ||
return true; | ||
} | ||
|
||
void GraphServer::waitUntilStop() { | ||
{ | ||
std::unique_lock<std::mutex> lkStop(muStop_); | ||
cvStop_.wait(lkStop, [&] { return serverStatus_ != STATUS_RUNNING; }); | ||
} | ||
|
||
thriftServer_->stop(); | ||
|
||
graphThread_->join(); | ||
} | ||
|
||
void GraphServer::notifyStop() { | ||
std::unique_lock<std::mutex> lkStop(muStop_); | ||
if (serverStatus_ == STATUS_RUNNING) { | ||
serverStatus_ = STATUS_STOPPED; | ||
cvStop_.notify_one(); | ||
} | ||
} | ||
|
||
void GraphServer::stop() { | ||
if (serverStatus_.load() == ServiceStatus::STATUS_STOPPED) { | ||
LOG(INFO) << "The graph server has been stopped"; | ||
return; | ||
} | ||
|
||
ServiceStatus serverExpected = ServiceStatus::STATUS_RUNNING; | ||
serverStatus_.compare_exchange_strong(serverExpected, STATUS_STOPPED); | ||
|
||
if (thriftServer_) { | ||
thriftServer_->stop(); | ||
} | ||
} | ||
|
||
} // namespace graph | ||
} // namespace nebula |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* Copyright (c) 2021 vesoft inc. All rights reserved. | ||
* | ||
* This source code is licensed under Apache 2.0 License. | ||
*/ | ||
#include <thrift/lib/cpp2/server/ThriftServer.h> | ||
|
||
#include <cstdint> | ||
#include <mutex> | ||
#include <thread> | ||
|
||
#include "common/base/Base.h" | ||
#include "common/base/SignalHandler.h" | ||
#include "common/network/NetworkUtils.h" | ||
namespace nebula { | ||
namespace graph { | ||
class GraphServer { | ||
public: | ||
explicit GraphServer(HostAddr localHost); | ||
|
||
~GraphServer(); | ||
|
||
// Return false if failed. | ||
bool start(); | ||
|
||
void stop(); | ||
|
||
// used for signal handler to set an internal stop flag | ||
void notifyStop(); | ||
|
||
void waitUntilStop(); | ||
|
||
private: | ||
HostAddr localHost_; | ||
|
||
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_; | ||
std::shared_ptr<apache::thrift::concurrency::ThreadManager> workers_; | ||
std::unique_ptr<apache::thrift::ThriftServer> thriftServer_; | ||
std::unique_ptr<std::thread> graphThread_; | ||
|
||
enum ServiceStatus : uint8_t { STATUS_UNINITIALIZED = 0, STATUS_RUNNING = 1, STATUS_STOPPED = 2 }; | ||
std::atomic<ServiceStatus> serverStatus_{STATUS_UNINITIALIZED}; | ||
std::mutex muStop_; | ||
std::condition_variable cvStop_; | ||
}; | ||
} // namespace graph | ||
} // namespace nebula |