diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index a2cfb188063..5eababb47a6 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -33,11 +33,9 @@ namespace nebula { namespace kvstore { NebulaStore::~NebulaStore() { + stop(); LOG(INFO) << "Cut off the relationship with meta client"; options_.partMan_.reset(); - raftService_->stop(); - LOG(INFO) << "Waiting for the raft service stop..."; - raftService_->waitUntilStop(); spaces_.clear(); spaceListeners_.clear(); bgWorkers_->stop(); @@ -55,7 +53,7 @@ bool NebulaStore::init() { CHECK(storeWorker_->start()); snapshot_.reset(new NebulaSnapshotManager(this)); raftService_ = raftex::RaftexService::createService(ioPool_, workers_, raftAddr_.port); - if (!raftService_->start()) { + if (raftService_ == nullptr) { LOG(ERROR) << "Start the raft service failed"; return false; } @@ -272,6 +270,7 @@ void NebulaStore::stop() { LOG(INFO) << "Stop the raft service..."; raftService_->stop(); + LOG(INFO) << "Stop kv engine..."; for (const auto& space : spaces_) { for (const auto& engine : space.second->engines_) { engine->stop(); diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 0bfbfba53e6..356aa38f269 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -111,7 +111,7 @@ class NebulaStore : public KVStore, public Handler { bool init(); /** - * @brief Stop the raft service and kv engine + * @brief Stop the engine */ void stop() override; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index b8fb73a55f7..68efbe05c2c 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -341,6 +341,8 @@ void RaftPart::stop() { VLOG(1) << idStr_ << "Invoked stop() on all peer hosts"; + // Host::waitForStop will wait a callback executed in ioThreadPool, so make sure the + // RaftPart::stop SHOULD NOT be executed in the same ioThreadPool for (auto& h : hosts) { VLOG(1) << idStr_ << "Waiting " << h->idStr() << " to stop"; h->waitForStop(); diff --git a/src/kvstore/raftex/RaftexService.cpp b/src/kvstore/raftex/RaftexService.cpp index 314f7bd8b84..a39ac5a78d7 100644 --- a/src/kvstore/raftex/RaftexService.cpp +++ b/src/kvstore/raftex/RaftexService.cpp @@ -21,93 +21,37 @@ namespace raftex { * ******************************************************/ std::shared_ptr RaftexService::createService( - std::shared_ptr pool, + std::shared_ptr ioPool, std::shared_ptr workers, uint16_t port) { - auto svc = std::shared_ptr(new RaftexService()); - CHECK(svc != nullptr) << "Failed to create a raft service"; - - svc->server_ = std::make_unique(); - CHECK(svc->server_ != nullptr) << "Failed to create a thrift server"; - svc->server_->setInterface(svc); - - svc->initThriftServer(pool, workers, port); - return svc; -} - -RaftexService::~RaftexService() {} - -bool RaftexService::start() { - serverThread_.reset(new std::thread([&] { serve(); })); - - waitUntilReady(); - - // start failed, reclaim resource - if (status_.load() != STATUS_RUNNING) { - waitUntilStop(); - return false; - } - - return true; -} - -void RaftexService::waitUntilReady() { - while (status_.load() == STATUS_NOT_RUNNING) { - std::this_thread::sleep_for(std::chrono::microseconds(100)); - } -} - -void RaftexService::initThriftServer(std::shared_ptr pool, - std::shared_ptr workers, - uint16_t port) { - LOG(INFO) << "Init thrift server for raft service, port: " << port; - if (FLAGS_enable_ssl) { - server_->setSSLConfig(nebula::sslContextConfig()); - } - server_->setPort(port); - server_->setIdleTimeout(std::chrono::seconds(0)); - if (pool != nullptr) { - server_->setIOThreadPool(pool); - } - if (workers != nullptr) { - server_->setThreadManager( - std::dynamic_pointer_cast(workers)); - } - server_->setStopWorkersOnStopListening(false); -} - -bool RaftexService::setup() { try { - server_->setup(); - serverPort_ = server_->getAddress().getPort(); - - LOG(INFO) << "Starting the Raftex Service on " << serverPort_; + auto svc = std::shared_ptr(new RaftexService()); + auto server = std::make_unique(); + server->setPort(port); + server->setIdleTimeout(std::chrono::seconds(0)); + if (ioPool != nullptr) { + server->setIOThreadPool(ioPool); + } + if (workers != nullptr) { + server->setThreadManager( + std::dynamic_pointer_cast(workers)); + } + if (FLAGS_enable_ssl) { + server->setSSLConfig(nebula::sslContextConfig()); + } + server->setInterface(svc); + server->setup(); + svc->server_ = std::move(server); + svc->serverPort_ = svc->server_->getAddress().getPort(); + LOG(INFO) << "Start raft service on " << svc->serverPort_; + return svc; } catch (const std::exception& e) { - LOG(ERROR) << "Setup the Raftex Service failed, error: " << e.what(); - return false; + LOG(ERROR) << "Start raft service failed: " << e.what(); + return nullptr; + } catch (...) { + LOG(ERROR) << "Start raft service failed"; + return nullptr; } - - return true; -} - -void RaftexService::serve() { - LOG(INFO) << "Starting the Raftex Service"; - - if (!setup()) { - status_.store(STATUS_SETUP_FAILED); - return; - } - - SCOPE_EXIT { - server_->cleanUp(); - }; - - status_.store(STATUS_RUNNING); - LOG(INFO) << "Start the Raftex Service successfully"; - server_->getEventBaseManager()->getEventBase()->loopForever(); - - status_.store(STATUS_NOT_RUNNING); - LOG(INFO) << "The Raftex Service stopped"; } std::shared_ptr RaftexService::getIOThreadPool() const { @@ -119,11 +63,6 @@ std::shared_ptr RaftexService::getThreadManager() { } void RaftexService::stop() { - int expected = STATUS_RUNNING; - if (!status_.compare_exchange_strong(expected, STATUS_NOT_RUNNING)) { - return; - } - // stop service LOG(INFO) << "Stopping the raftex service on port " << serverPort_; std::unordered_map, std::shared_ptr> parts; @@ -139,16 +78,6 @@ void RaftexService::stop() { server_->stop(); } -void RaftexService::waitUntilStop() { - if (serverThread_) { - serverThread_->join(); - serverThread_.reset(); - server_.reset(); - LOG(INFO) << "Server thread has stopped. Service on port " << serverPort_ - << " is ready to be destroyed"; - } -} - void RaftexService::addPartition(std::shared_ptr part) { // todo(doodle): If we need to start both listener and normal replica on same // hosts, this class need to be aware of type. diff --git a/src/kvstore/raftex/RaftexService.h b/src/kvstore/raftex/RaftexService.h index 74d9245b7bf..e6fb496ac24 100644 --- a/src/kvstore/raftex/RaftexService.h +++ b/src/kvstore/raftex/RaftexService.h @@ -27,20 +27,20 @@ class RaftexService : public cpp2::RaftexServiceSvIf { /** * @brief Create a raft service * - * @param pool IOThreadPool to use + * @param ioPool IOThreadPool to use * @param workers Worker thread pool to use * @param port Listen port of thrift server * @return std::shared_ptr */ static std::shared_ptr createService( - std::shared_ptr pool, + std::shared_ptr ioPool, std::shared_ptr workers, uint16_t port = 0); /** * @brief Destroy the Raftex Service */ - virtual ~RaftexService(); + virtual ~RaftexService() = default; /** * @brief Return the raft thrift server port @@ -64,22 +64,10 @@ class RaftexService : public cpp2::RaftexServiceSvIf { std::shared_ptr getThreadManager(); /** - * @brief Start the raft thrift server - * - * @return Whether start succeed - */ - bool start(); - - /** - * @brief Set the state to stopped + * @brief Stop all partitions and wait thrift server stops */ void stop(); - /** - * @brief Wait until the thrift server has been stopped - */ - void waitUntilStop(); - /** * @brief Handle leader election request in worker thread * @@ -143,40 +131,11 @@ class RaftexService : public cpp2::RaftexServiceSvIf { std::shared_ptr findPart(GraphSpaceID spaceId, PartitionID partId); private: - /** - * @brief Start the thrift server - * - * @param pool IO thread pool - * @param workers Worker thread pool - * @param port Thrift port to listener - */ - void initThriftServer(std::shared_ptr pool, - std::shared_ptr workers, - uint16_t port = 0); - - /** - * @brief Prepare the setup of thrift server - * - * @return Return whether succeed - */ - bool setup(); - void serve(); - - /** - * @brief Wait until the service is ready to serve - */ - void waitUntilReady(); - RaftexService() = default; - private: std::unique_ptr server_; - std::unique_ptr serverThread_; uint32_t serverPort_; - enum RaftServiceStatus { STATUS_NOT_RUNNING = 0, STATUS_SETUP_FAILED = 1, STATUS_RUNNING = 2 }; - std::atomic_int status_{STATUS_NOT_RUNNING}; - folly::RWSpinLock partsLock_; std::unordered_map, std::shared_ptr> parts_; }; diff --git a/src/kvstore/raftex/test/RaftexTestBase.cpp b/src/kvstore/raftex/test/RaftexTestBase.cpp index 951dea666b9..ca59ccaeefb 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.cpp +++ b/src/kvstore/raftex/test/RaftexTestBase.cpp @@ -161,7 +161,6 @@ void setupRaft(int32_t numCopies, // Set up services for (int i = 0; i < numCopies; ++i) { services.emplace_back(RaftexService::createService(nullptr, nullptr)); - if (!services.back()->start()) return; uint16_t port = services.back()->getServerPort(); allHosts.emplace_back(ipStr, port); } @@ -217,7 +216,7 @@ void finishRaft(std::vector>& services, workers->wait(); LOG(INFO) << "Waiting for all service stopped"; for (auto& svc : services) { - svc->waitUntilStop(); + svc->stop(); } } diff --git a/src/mock/MockCluster.h b/src/mock/MockCluster.h index 45493ce4aec..28b380e793d 100644 --- a/src/mock/MockCluster.h +++ b/src/mock/MockCluster.h @@ -34,8 +34,6 @@ class MockCluster { ~MockCluster() { stop(); - storageAdminServer_.reset(); - graphStorageServer_.reset(); } void startAll(); @@ -94,6 +92,16 @@ class MockCluster { } void stop() { + if (storageKV_) { + storageKV_->stop(); + } + if (metaKV_) { + metaKV_->stop(); + } + + storageAdminServer_.reset(); + graphStorageServer_.reset(); + if (metaClient_) { metaClient_->notifyStop(); metaClient_->stop(); @@ -102,12 +110,6 @@ class MockCluster { metaClient_->notifyStop(); lMetaClient_->stop(); } - if (metaKV_) { - metaKV_->stop(); - } - if (storageKV_) { - storageKV_->stop(); - } if (esListener_) { esListener_->stop(); } diff --git a/src/storage/StorageServer.cpp b/src/storage/StorageServer.cpp index 4ccdc2307c8..c725f38a528 100644 --- a/src/storage/StorageServer.cpp +++ b/src/storage/StorageServer.cpp @@ -63,10 +63,6 @@ StorageServer::StorageServer(HostAddr localHost, walPath_(std::move(walPath)), listenerPath_(std::move(listenerPath)) {} -StorageServer::~StorageServer() { - stop(); -} - std::unique_ptr StorageServer::getStoreInstance() { kvstore::KVOptions options; options.dataPaths_ = dataPaths_; @@ -238,102 +234,13 @@ bool StorageServer::start() { return false; } - storageThread_.reset(new std::thread([this] { - try { - auto handler = std::make_shared(env_.get()); -#ifndef BUILD_STANDALONE - storageServer_ = std::make_unique(); - storageServer_->setPort(FLAGS_port); - storageServer_->setIdleTimeout(std::chrono::seconds(0)); - storageServer_->setIOThreadPool(ioThreadPool_); - storageServer_->setStopWorkersOnStopListening(false); - if (FLAGS_enable_ssl) { - storageServer_->setSSLConfig(nebula::sslContextConfig()); - } -#else - storageServer_ = GraphStorageLocalServer::getInstance(); -#endif - - storageServer_->setThreadManager(workers_); - storageServer_->setInterface(std::move(handler)); - ServiceStatus expected = STATUS_UNINITIALIZED; - if (!storageSvcStatus_.compare_exchange_strong(expected, STATUS_RUNNING)) { - LOG(ERROR) << "Impossible! How could it happen!"; - return; - } - LOG(INFO) << "The storage service start on " << localHost_; - storageServer_->serve(); // Will wait until the server shuts down - } catch (const std::exception& e) { - LOG(ERROR) << "Start storage service failed, error:" << e.what(); - } - storageSvcStatus_.store(STATUS_STOPPED); - LOG(INFO) << "The storage service stopped"; - })); - - adminThread_.reset(new std::thread([this] { - try { - auto handler = std::make_shared(env_.get()); - auto adminAddr = Utils::getAdminAddrFromStoreAddr(localHost_); - adminServer_ = std::make_unique(); - adminServer_->setPort(adminAddr.port); - adminServer_->setIdleTimeout(std::chrono::seconds(0)); - adminServer_->setIOThreadPool(ioThreadPool_); - adminServer_->setThreadManager(workers_); - adminServer_->setStopWorkersOnStopListening(false); - adminServer_->setInterface(std::move(handler)); - if (FLAGS_enable_ssl) { - adminServer_->setSSLConfig(nebula::sslContextConfig()); - } - - ServiceStatus expected = STATUS_UNINITIALIZED; - if (!adminSvcStatus_.compare_exchange_strong(expected, STATUS_RUNNING)) { - LOG(ERROR) << "Impossible! How could it happen!"; - return; - } - LOG(INFO) << "The admin service start on " << adminAddr; - adminServer_->serve(); // Will wait until the server shuts down - } catch (const std::exception& e) { - LOG(ERROR) << "Start admin service failed, error:" << e.what(); - } - adminSvcStatus_.store(STATUS_STOPPED); - LOG(INFO) << "The admin service stopped"; - })); - - internalStorageThread_.reset(new std::thread([this] { - try { - auto handler = std::make_shared(env_.get()); - auto internalAddr = Utils::getInternalAddrFromStoreAddr(localHost_); - internalStorageServer_ = std::make_unique(); - internalStorageServer_->setPort(internalAddr.port); - internalStorageServer_->setIdleTimeout(std::chrono::seconds(0)); - internalStorageServer_->setIOThreadPool(ioThreadPool_); - internalStorageServer_->setThreadManager(workers_); - internalStorageServer_->setStopWorkersOnStopListening(false); - internalStorageServer_->setInterface(std::move(handler)); - if (FLAGS_enable_ssl) { - internalStorageServer_->setSSLConfig(nebula::sslContextConfig()); - } - - internalStorageSvcStatus_.store(STATUS_RUNNING); - LOG(INFO) << "The internal storage service start(same with admin) on " << internalAddr; - internalStorageServer_->serve(); // Will wait until the server shuts down - } catch (const std::exception& e) { - LOG(ERROR) << "Start internal storage service failed, error:" << e.what(); - } - internalStorageSvcStatus_.store(STATUS_STOPPED); - LOG(INFO) << "The internal storage service stopped"; - })); - - while (storageSvcStatus_.load() == STATUS_UNINITIALIZED || - adminSvcStatus_.load() == STATUS_UNINITIALIZED || - internalStorageSvcStatus_.load() == STATUS_UNINITIALIZED) { - std::this_thread::sleep_for(std::chrono::microseconds(100)); - } - - if (storageSvcStatus_.load() != STATUS_RUNNING || adminSvcStatus_.load() != STATUS_RUNNING || - internalStorageSvcStatus_.load() != STATUS_RUNNING) { + storageServer_ = getStorageServer(); + adminServer_ = getAdminServer(); + internalStorageServer_ = getInternalServer(); + if (!storageServer_ || !adminServer_ || !internalStorageServer_) { return false; } + { std::lock_guard lkStop(muStop_); if (serverStatus_ != STATUS_UNINITIALIZED) { @@ -354,10 +261,6 @@ void StorageServer::waitUntilStop() { } this->stop(); - - adminThread_->join(); - storageThread_->join(); - internalStorageThread_->join(); } void StorageServer::notifyStop() { @@ -372,29 +275,29 @@ void StorageServer::notifyStop() { } void StorageServer::stop() { - if (adminSvcStatus_.load() == ServiceStatus::STATUS_STOPPED && - storageSvcStatus_.load() == ServiceStatus::STATUS_STOPPED && - internalStorageSvcStatus_.load() == ServiceStatus::STATUS_STOPPED) { - LOG(INFO) << "All services has been stopped"; - return; - } - - ServiceStatus adminExpected = ServiceStatus::STATUS_RUNNING; - adminSvcStatus_.compare_exchange_strong(adminExpected, STATUS_STOPPED); - - ServiceStatus storageExpected = ServiceStatus::STATUS_RUNNING; - storageSvcStatus_.compare_exchange_strong(storageExpected, STATUS_STOPPED); - - ServiceStatus interStorageExpected = ServiceStatus::STATUS_RUNNING; - internalStorageSvcStatus_.compare_exchange_strong(interStorageExpected, STATUS_STOPPED); + // Stop http service + webSvc_.reset(); - // kvstore need to stop back ground job before http server dctor + // Stop all thrift server: raft/storage/admin/internal if (kvstore_) { + // stop kvstore background job and raft services kvstore_->stop(); } + if (adminServer_) { + adminServer_->cleanUp(); + } + if (internalStorageServer_) { + internalStorageServer_->cleanUp(); + } + if (storageServer_) { +#ifndef BUILD_STANDALONE + storageServer_->cleanUp(); +#else + storageServer_->stop(); +#endif + } - webSvc_.reset(); - + // Stop all interface related to kvstore if (txnMan_) { txnMan_->stop(); txnMan_->join(); @@ -403,19 +306,94 @@ void StorageServer::stop() { taskMgr_->shutdown(); } if (metaClient_) { - metaClient_->notifyStop(); + metaClient_->stop(); } + + // Stop kvstore if (kvstore_) { kvstore_.reset(); } - if (adminServer_) { - adminServer_->stop(); +} + +#ifndef BUILD_STANDALONE +std::unique_ptr StorageServer::getStorageServer() { + try { + auto handler = std::make_shared(env_.get()); + auto server = std::make_unique(); + server->setPort(FLAGS_port); + server->setIdleTimeout(std::chrono::seconds(0)); + server->setIOThreadPool(ioThreadPool_); + server->setThreadManager(workers_); + if (FLAGS_enable_ssl) { + server->setSSLConfig(nebula::sslContextConfig()); + } + server->setInterface(std::move(handler)); + server->setup(); + return server; + } catch (const std::exception& e) { + LOG(ERROR) << "Start storage server failed: " << e.what(); + return nullptr; + } catch (...) { + LOG(ERROR) << "Start storage server failed"; + return nullptr; } - if (internalStorageServer_) { - internalStorageServer_->stop(); +} +#else +std::shared_ptr StorageServer::getStorageServer() { + auto handler = std::make_shared(env_.get()); + auto server = GraphStorageLocalServer::getInstance(); + server->setThreadManager(workers_); + server->setInterface(std::move(handler)); + server->serve(); + return server; +} +#endif + +std::unique_ptr StorageServer::getAdminServer() { + try { + auto handler = std::make_shared(env_.get()); + auto adminAddr = Utils::getAdminAddrFromStoreAddr(localHost_); + auto server = std::make_unique(); + server->setPort(adminAddr.port); + server->setIdleTimeout(std::chrono::seconds(0)); + server->setIOThreadPool(ioThreadPool_); + server->setThreadManager(workers_); + if (FLAGS_enable_ssl) { + server->setSSLConfig(nebula::sslContextConfig()); + } + server->setInterface(std::move(handler)); + server->setup(); + return server; + } catch (const std::exception& e) { + LOG(ERROR) << "Start amdin server failed: " << e.what(); + return nullptr; + } catch (...) { + LOG(ERROR) << "Start amdin server failed"; + return nullptr; } - if (storageServer_) { - storageServer_->stop(); +} + +std::unique_ptr StorageServer::getInternalServer() { + try { + auto handler = std::make_shared(env_.get()); + auto internalAddr = Utils::getInternalAddrFromStoreAddr(localHost_); + auto server = std::make_unique(); + server->setPort(internalAddr.port); + server->setIdleTimeout(std::chrono::seconds(0)); + server->setIOThreadPool(ioThreadPool_); + server->setThreadManager(workers_); + if (FLAGS_enable_ssl) { + server->setSSLConfig(nebula::sslContextConfig()); + } + server->setInterface(std::move(handler)); + server->setup(); + return server; + } catch (const std::exception& e) { + LOG(ERROR) << "Start internal storage server failed: " << e.what(); + return nullptr; + } catch (...) { + LOG(ERROR) << "Start internal storage server failed"; + return nullptr; } } diff --git a/src/storage/StorageServer.h b/src/storage/StorageServer.h index 1cd7ab61019..94d52569d6a 100644 --- a/src/storage/StorageServer.h +++ b/src/storage/StorageServer.h @@ -19,11 +19,10 @@ #include "storage/GraphStorageLocalServer.h" #include "storage/admin/AdminTaskManager.h" #include "storage/transaction/TransactionManager.h" +#include "webservice/WebService.h" namespace nebula { -class WebService; - namespace storage { class StorageServer final { @@ -34,8 +33,6 @@ class StorageServer final { std::string walPath = "", std::string listenerPath = ""); - ~StorageServer(); - // Return false if failed. bool start(); @@ -68,6 +65,26 @@ class StorageServer final { int32_t getAdminStoreSeqId(); bool initWebService(); + + /** + * @brief storage thrift server, mainly for graph query + */ +#ifndef BUILD_STANDALONE + std::unique_ptr getStorageServer(); +#else + std::shared_ptr getStorageServer(); +#endif + + /** + * @brief admin thrift server, mainly for meta to control storage + */ + std::unique_ptr getAdminServer(); + + /** + * @brief internal thrift server, mainly for toss now + */ + std::unique_ptr getInternalServer(); + /** * @brief used by all thrift client, and kvstore. * default num is 16 @@ -75,20 +92,12 @@ class StorageServer final { std::shared_ptr ioThreadPool_; std::shared_ptr workers_; - std::unique_ptr storageThread_; - std::unique_ptr adminThread_; - std::atomic storageSvcStatus_{STATUS_UNINITIALIZED}; - std::atomic adminSvcStatus_{STATUS_UNINITIALIZED}; - #ifndef BUILD_STANDALONE std::unique_ptr storageServer_; #else std::shared_ptr storageServer_; #endif std::unique_ptr adminServer_; - - std::unique_ptr internalStorageThread_; - std::atomic internalStorageSvcStatus_{STATUS_UNINITIALIZED}; std::unique_ptr internalStorageServer_; std::unique_ptr webSvc_;