From 612c56436a61eacd5a9fd96a0cfc8c17727db1e3 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Thu, 24 Nov 2022 15:14:03 +0800 Subject: [PATCH 1/2] minor fix --- src/kvstore/Listener.cpp | 11 +++----- src/kvstore/Listener.h | 27 +++++++++---------- src/kvstore/NebulaStore.cpp | 13 +++++---- src/kvstore/NebulaStore.h | 6 +++-- .../plugins/elasticsearch/ESListener.h | 11 +------- src/kvstore/test/NebulaListenerTest.cpp | 11 +------- 6 files changed, 29 insertions(+), 50 deletions(-) diff --git a/src/kvstore/Listener.cpp b/src/kvstore/Listener.cpp index a989e128672..2bccddd26dd 100644 --- a/src/kvstore/Listener.cpp +++ b/src/kvstore/Listener.cpp @@ -23,10 +23,7 @@ Listener::Listener(GraphSpaceID spaceId, const std::string& walPath, std::shared_ptr ioPool, std::shared_ptr workers, - std::shared_ptr handlers, - std::shared_ptr snapshotMan, - std::shared_ptr clientMan, - std::shared_ptr diskMan) + std::shared_ptr handlers) : RaftPart(FLAGS_cluster_id, spaceId, partId, @@ -35,9 +32,9 @@ Listener::Listener(GraphSpaceID spaceId, ioPool, workers, handlers, - snapshotMan, - clientMan, - diskMan) {} + nullptr, + nullptr, + nullptr) {} void Listener::start(std::vector&& peers, bool) { std::lock_guard g(raftLock_); diff --git a/src/kvstore/Listener.h b/src/kvstore/Listener.h index 5aa4640c001..3391811b02f 100644 --- a/src/kvstore/Listener.h +++ b/src/kvstore/Listener.h @@ -77,15 +77,17 @@ using RaftClient = thrift::ThriftClientManager lastCommittedLogId() * * // read last apply id from external storage, used in initialization * LogID lastApplyLogId() * - * // apply the kv to state machine - * bool apply(const std::vector& data) - * * // persist last commit log id/term and lastApplyId * bool persist(LogID, TermID, LogID) */ @@ -101,10 +103,6 @@ class Listener : public raftex::RaftPart { * @param ioPool IOThreadPool for listener * @param workers Background thread for listener * @param handlers Worker thread for listener - * @param snapshotMan Snapshot manager - * @param clientMan Client manager - * @param diskMan Disk manager - * @param schemaMan Schema manager */ Listener(GraphSpaceID spaceId, PartitionID partId, @@ -112,10 +110,7 @@ class Listener : public raftex::RaftPart { const std::string& walPath, std::shared_ptr ioPool, std::shared_ptr workers, - std::shared_ptr handlers, - std::shared_ptr snapshotMan, - std::shared_ptr clientMan, - std::shared_ptr diskMan); + std::shared_ptr handlers); /** * @brief Initialize listener, all Listener must call this method @@ -185,6 +180,13 @@ class Listener : public raftex::RaftPart { */ virtual bool persist(LogID commitLogId, TermID commitLogTerm, LogID lastApplyLogId) = 0; + /** + * @brief Main interface to process logs, listener need to apply the committed log entry to their + * state machine. Once apply succeeded, user should call persist() to make their progress + * persisted. + */ + virtual void processLogs() = 0; + /** * @brief Callback when a raft node lost leadership on term, should not happen in listener * @@ -269,9 +271,6 @@ class Listener : public raftex::RaftPart { */ void doApply(); - // Process logs and then call apply to execute - virtual void processLogs() = 0; - protected: LogID leaderCommitId_ = 0; LogID lastApplyLogId_ = 0; diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 4fe9e8ff28e..566ff7714ab 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -582,10 +582,7 @@ void NebulaStore::removeListenerSpace(GraphSpaceID spaceId, meta::cpp2::Listener folly::RWSpinLock::WriteHolder wh(&lock_); auto spaceIt = this->spaceListeners_.find(spaceId); if (spaceIt != this->spaceListeners_.end()) { - for (const auto& partEntry : spaceIt->second->listeners_) { - CHECK(partEntry.second.empty()); - } - this->spaceListeners_.erase(spaceIt); + // Perform extra destruction of given type of listener here; } LOG(INFO) << "Listener space " << spaceId << " has been removed!"; } @@ -609,7 +606,7 @@ void NebulaStore::addListenerPart(GraphSpaceID spaceId, << " of [Space: " << spaceId << ", Part: " << partId << "] has existed!"; return; } - partIt->second.emplace(type, newListener(spaceId, partId, std::move(type), peers)); + partIt->second.emplace(type, newListener(spaceId, partId, type, peers)); LOG(INFO) << "Listener of type " << apache::thrift::util::enumNameSafe(type) << " of [Space: " << spaceId << ", Part: " << partId << "] is added"; return; @@ -619,10 +616,12 @@ std::shared_ptr NebulaStore::newListener(GraphSpaceID spaceId, PartitionID partId, meta::cpp2::ListenerType type, const std::vector& peers) { + // Lock has been acquired in addListenerPart. + // todo(doodle): we don't support start multiple type of listener in same process for now. If we + // suppport it later, the wal path may or may not need to be separated depending on how we + // implement it. auto walPath = folly::stringPrintf("%s/%d/%d/wal", options_.listenerPath_.c_str(), spaceId, partId); - // snapshot manager and client manager is set to nullptr, listener should - // never use them std::shared_ptr listener; if (type == meta::cpp2::ListenerType::ELASTICSEARCH) { listener = std::make_shared( diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 3439991ab8f..276a4751d38 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -621,7 +621,8 @@ class NebulaStore : public KVStore, public Handler { const std::vector& files) override; /** - * @brief Add a space as listener + * @brief Add a specified type listener to space. Perform extra initialization of given type + * listener if necessary. User should call addListenerSpace first then addListenerPart. * * @param spaceId * @param type Listener type @@ -629,7 +630,8 @@ class NebulaStore : public KVStore, public Handler { void addListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) override; /** - * @brief Remove a listener space + * @brief Remove a specified type listener from space. Perform extra destruction of given type + * listener if necessary. User should call removeListenerPart first then removeListenerSpace. * * @param spaceId * @param type Listener type diff --git a/src/kvstore/plugins/elasticsearch/ESListener.h b/src/kvstore/plugins/elasticsearch/ESListener.h index 2f82943e8f4..f62ed55c8c1 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.h +++ b/src/kvstore/plugins/elasticsearch/ESListener.h @@ -40,16 +40,7 @@ class ESListener : public Listener { std::shared_ptr workers, std::shared_ptr handlers, meta::SchemaManager* schemaMan) - : Listener(spaceId, - partId, - std::move(localAddr), - walPath, - ioPool, - workers, - handlers, - nullptr, - nullptr, - nullptr), + : Listener(spaceId, partId, std::move(localAddr), walPath, ioPool, workers, handlers), schemaMan_(schemaMan) { CHECK(!!schemaMan); lastApplyLogFile_ = std::make_unique( diff --git a/src/kvstore/test/NebulaListenerTest.cpp b/src/kvstore/test/NebulaListenerTest.cpp index 9ef87dc40ed..816c2093e80 100644 --- a/src/kvstore/test/NebulaListenerTest.cpp +++ b/src/kvstore/test/NebulaListenerTest.cpp @@ -41,16 +41,7 @@ class DummyListener : public Listener { std::shared_ptr ioPool, std::shared_ptr workers, std::shared_ptr handlers) - : Listener(spaceId, - partId, - localAddr, - walPath, - ioPool, - workers, - handlers, - nullptr, - nullptr, - nullptr) {} + : Listener(spaceId, partId, localAddr, walPath, ioPool, workers, handlers) {} std::vector data() { return data_; From 98386eb47c88f779b7912df2a1d07211b2d18c55 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Thu, 24 Nov 2022 19:48:20 +0800 Subject: [PATCH 2/2] remove outdate comments --- src/kvstore/plugins/elasticsearch/ESListener.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/kvstore/plugins/elasticsearch/ESListener.h b/src/kvstore/plugins/elasticsearch/ESListener.h index f62ed55c8c1..19c9fd8c73f 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.h +++ b/src/kvstore/plugins/elasticsearch/ESListener.h @@ -27,9 +27,6 @@ class ESListener : public Listener { * @param ioPool IOThreadPool for listener * @param workers Background thread for listener * @param handlers Worker thread for listener - * @param snapshotMan Snapshot manager - * @param clientMan Client manager - * @param diskMan Disk manager * @param schemaMan Schema manager */ ESListener(GraphSpaceID spaceId,