diff --git a/src/kvstore/Listener.cpp b/src/kvstore/Listener.cpp index a989e128672..2bccddd26dd 100644 --- a/src/kvstore/Listener.cpp +++ b/src/kvstore/Listener.cpp @@ -23,10 +23,7 @@ Listener::Listener(GraphSpaceID spaceId, const std::string& walPath, std::shared_ptr ioPool, std::shared_ptr workers, - std::shared_ptr handlers, - std::shared_ptr snapshotMan, - std::shared_ptr clientMan, - std::shared_ptr diskMan) + std::shared_ptr handlers) : RaftPart(FLAGS_cluster_id, spaceId, partId, @@ -35,9 +32,9 @@ Listener::Listener(GraphSpaceID spaceId, ioPool, workers, handlers, - snapshotMan, - clientMan, - diskMan) {} + nullptr, + nullptr, + nullptr) {} void Listener::start(std::vector&& peers, bool) { std::lock_guard g(raftLock_); diff --git a/src/kvstore/Listener.h b/src/kvstore/Listener.h index 5aa4640c001..3391811b02f 100644 --- a/src/kvstore/Listener.h +++ b/src/kvstore/Listener.h @@ -77,15 +77,17 @@ using RaftClient = thrift::ThriftClientManager lastCommittedLogId() * * // read last apply id from external storage, used in initialization * LogID lastApplyLogId() * - * // apply the kv to state machine - * bool apply(const std::vector& data) - * * // persist last commit log id/term and lastApplyId * bool persist(LogID, TermID, LogID) */ @@ -101,10 +103,6 @@ class Listener : public raftex::RaftPart { * @param ioPool IOThreadPool for listener * @param workers Background thread for listener * @param handlers Worker thread for listener - * @param snapshotMan Snapshot manager - * @param clientMan Client manager - * @param diskMan Disk manager - * @param schemaMan Schema manager */ Listener(GraphSpaceID spaceId, PartitionID partId, @@ -112,10 +110,7 @@ class Listener : public raftex::RaftPart { const std::string& walPath, std::shared_ptr ioPool, std::shared_ptr workers, - std::shared_ptr handlers, - std::shared_ptr snapshotMan, - std::shared_ptr clientMan, - std::shared_ptr diskMan); + std::shared_ptr handlers); /** * @brief Initialize listener, all Listener must call this method @@ -185,6 +180,13 @@ class Listener : public raftex::RaftPart { */ virtual bool persist(LogID commitLogId, TermID commitLogTerm, LogID lastApplyLogId) = 0; + /** + * @brief Main interface to process logs, listener need to apply the committed log entry to their + * state machine. Once apply succeeded, user should call persist() to make their progress + * persisted. + */ + virtual void processLogs() = 0; + /** * @brief Callback when a raft node lost leadership on term, should not happen in listener * @@ -269,9 +271,6 @@ class Listener : public raftex::RaftPart { */ void doApply(); - // Process logs and then call apply to execute - virtual void processLogs() = 0; - protected: LogID leaderCommitId_ = 0; LogID lastApplyLogId_ = 0; diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 4fe9e8ff28e..566ff7714ab 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -582,10 +582,7 @@ void NebulaStore::removeListenerSpace(GraphSpaceID spaceId, meta::cpp2::Listener folly::RWSpinLock::WriteHolder wh(&lock_); auto spaceIt = this->spaceListeners_.find(spaceId); if (spaceIt != this->spaceListeners_.end()) { - for (const auto& partEntry : spaceIt->second->listeners_) { - CHECK(partEntry.second.empty()); - } - this->spaceListeners_.erase(spaceIt); + // Perform extra destruction of given type of listener here; } LOG(INFO) << "Listener space " << spaceId << " has been removed!"; } @@ -609,7 +606,7 @@ void NebulaStore::addListenerPart(GraphSpaceID spaceId, << " of [Space: " << spaceId << ", Part: " << partId << "] has existed!"; return; } - partIt->second.emplace(type, newListener(spaceId, partId, std::move(type), peers)); + partIt->second.emplace(type, newListener(spaceId, partId, type, peers)); LOG(INFO) << "Listener of type " << apache::thrift::util::enumNameSafe(type) << " of [Space: " << spaceId << ", Part: " << partId << "] is added"; return; @@ -619,10 +616,12 @@ std::shared_ptr NebulaStore::newListener(GraphSpaceID spaceId, PartitionID partId, meta::cpp2::ListenerType type, const std::vector& peers) { + // Lock has been acquired in addListenerPart. + // todo(doodle): we don't support start multiple type of listener in same process for now. If we + // suppport it later, the wal path may or may not need to be separated depending on how we + // implement it. auto walPath = folly::stringPrintf("%s/%d/%d/wal", options_.listenerPath_.c_str(), spaceId, partId); - // snapshot manager and client manager is set to nullptr, listener should - // never use them std::shared_ptr listener; if (type == meta::cpp2::ListenerType::ELASTICSEARCH) { listener = std::make_shared( diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 3439991ab8f..276a4751d38 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -621,7 +621,8 @@ class NebulaStore : public KVStore, public Handler { const std::vector& files) override; /** - * @brief Add a space as listener + * @brief Add a specified type listener to space. Perform extra initialization of given type + * listener if necessary. User should call addListenerSpace first then addListenerPart. * * @param spaceId * @param type Listener type @@ -629,7 +630,8 @@ class NebulaStore : public KVStore, public Handler { void addListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) override; /** - * @brief Remove a listener space + * @brief Remove a specified type listener from space. Perform extra destruction of given type + * listener if necessary. User should call removeListenerPart first then removeListenerSpace. * * @param spaceId * @param type Listener type diff --git a/src/kvstore/plugins/elasticsearch/ESListener.h b/src/kvstore/plugins/elasticsearch/ESListener.h index 2f82943e8f4..19c9fd8c73f 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.h +++ b/src/kvstore/plugins/elasticsearch/ESListener.h @@ -27,9 +27,6 @@ class ESListener : public Listener { * @param ioPool IOThreadPool for listener * @param workers Background thread for listener * @param handlers Worker thread for listener - * @param snapshotMan Snapshot manager - * @param clientMan Client manager - * @param diskMan Disk manager * @param schemaMan Schema manager */ ESListener(GraphSpaceID spaceId, @@ -40,16 +37,7 @@ class ESListener : public Listener { std::shared_ptr workers, std::shared_ptr handlers, meta::SchemaManager* schemaMan) - : Listener(spaceId, - partId, - std::move(localAddr), - walPath, - ioPool, - workers, - handlers, - nullptr, - nullptr, - nullptr), + : Listener(spaceId, partId, std::move(localAddr), walPath, ioPool, workers, handlers), schemaMan_(schemaMan) { CHECK(!!schemaMan); lastApplyLogFile_ = std::make_unique( diff --git a/src/kvstore/test/NebulaListenerTest.cpp b/src/kvstore/test/NebulaListenerTest.cpp index 9ef87dc40ed..816c2093e80 100644 --- a/src/kvstore/test/NebulaListenerTest.cpp +++ b/src/kvstore/test/NebulaListenerTest.cpp @@ -41,16 +41,7 @@ class DummyListener : public Listener { std::shared_ptr ioPool, std::shared_ptr workers, std::shared_ptr handlers) - : Listener(spaceId, - partId, - localAddr, - walPath, - ioPool, - workers, - handlers, - nullptr, - nullptr, - nullptr) {} + : Listener(spaceId, partId, localAddr, walPath, ioPool, workers, handlers) {} std::vector data() { return data_;