Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor enhancement about listener #4931

Merged
merged 4 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ Listener::Listener(GraphSpaceID spaceId,
const std::string& walPath,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<thread::GenericThreadPool> workers,
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan,
std::shared_ptr<RaftClient> clientMan,
std::shared_ptr<DiskManager> diskMan)
std::shared_ptr<folly::Executor> handlers)
: RaftPart(FLAGS_cluster_id,
spaceId,
partId,
Expand All @@ -35,9 +32,9 @@ Listener::Listener(GraphSpaceID spaceId,
ioPool,
workers,
handlers,
snapshotMan,
clientMan,
diskMan) {}
nullptr,
nullptr,
nullptr) {}

void Listener::start(std::vector<HostAddr>&& peers, bool) {
std::lock_guard<std::mutex> g(raftLock_);
Expand Down
27 changes: 13 additions & 14 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,17 @@ using RaftClient = thrift::ThriftClientManager<raftex::cpp2::RaftexServiceAsyncC
* // extra initialize work could do here
* void init()
*
* // Main interface to process logs, listener need to apply the committed log entry to their
* // state machine. Once apply succeeded, user should call persist() to make their progress
* // persisted.
* virtual void processLogs() = 0;
Copy link
Contributor

@panda-sheep panda-sheep Nov 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a small question:
Why does the function comment here exist? Can it be removed?
Only leave functions, such as the above function, only need to leave
virtual void processLogs() = 0;

Otherwise, there are two copies of these comments, which feels a bit redundant?

*
* // read last commit log id and term from external storage, used in initialization
* std::pair<LogID, TermID> lastCommittedLogId()
*
* // read last apply id from external storage, used in initialization
* LogID lastApplyLogId()
*
* // apply the kv to state machine
* bool apply(const std::vector<KV>& data)
*
* // persist last commit log id/term and lastApplyId
* bool persist(LogID, TermID, LogID)
*/
Expand All @@ -101,21 +103,14 @@ class Listener : public raftex::RaftPart {
* @param ioPool IOThreadPool for listener
* @param workers Background thread for listener
* @param handlers Worker thread for listener
* @param snapshotMan Snapshot manager
* @param clientMan Client manager
* @param diskMan Disk manager
* @param schemaMan Schema manager
*/
Listener(GraphSpaceID spaceId,
PartitionID partId,
HostAddr localAddr,
const std::string& walPath,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<thread::GenericThreadPool> workers,
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan,
std::shared_ptr<RaftClient> clientMan,
std::shared_ptr<DiskManager> diskMan);
std::shared_ptr<folly::Executor> handlers);

/**
* @brief Initialize listener, all Listener must call this method
Expand Down Expand Up @@ -185,6 +180,13 @@ class Listener : public raftex::RaftPart {
*/
virtual bool persist(LogID commitLogId, TermID commitLogTerm, LogID lastApplyLogId) = 0;

/**
* @brief Main interface to process logs, listener need to apply the committed log entry to their
* state machine. Once apply succeeded, user should call persist() to make their progress
* persisted.
*/
virtual void processLogs() = 0;

/**
* @brief Callback when a raft node lost leadership on term, should not happen in listener
*
Expand Down Expand Up @@ -269,9 +271,6 @@ class Listener : public raftex::RaftPart {
*/
void doApply();

// Process logs and then call apply to execute
virtual void processLogs() = 0;

protected:
LogID leaderCommitId_ = 0;
LogID lastApplyLogId_ = 0;
Expand Down
13 changes: 6 additions & 7 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,7 @@ void NebulaStore::removeListenerSpace(GraphSpaceID spaceId, meta::cpp2::Listener
folly::RWSpinLock::WriteHolder wh(&lock_);
auto spaceIt = this->spaceListeners_.find(spaceId);
if (spaceIt != this->spaceListeners_.end()) {
for (const auto& partEntry : spaceIt->second->listeners_) {
CHECK(partEntry.second.empty());
}
this->spaceListeners_.erase(spaceIt);
// Perform extra destruction of given type of listener here;
}
LOG(INFO) << "Listener space " << spaceId << " has been removed!";
}
Expand All @@ -609,7 +606,7 @@ void NebulaStore::addListenerPart(GraphSpaceID spaceId,
<< " of [Space: " << spaceId << ", Part: " << partId << "] has existed!";
return;
}
partIt->second.emplace(type, newListener(spaceId, partId, std::move(type), peers));
partIt->second.emplace(type, newListener(spaceId, partId, type, peers));
LOG(INFO) << "Listener of type " << apache::thrift::util::enumNameSafe(type)
<< " of [Space: " << spaceId << ", Part: " << partId << "] is added";
return;
Expand All @@ -619,10 +616,12 @@ std::shared_ptr<Listener> NebulaStore::newListener(GraphSpaceID spaceId,
PartitionID partId,
meta::cpp2::ListenerType type,
const std::vector<HostAddr>& peers) {
// Lock has been acquired in addListenerPart.
// todo(doodle): we don't support start multiple type of listener in same process for now. If we
// suppport it later, the wal path may or may not need to be separated depending on how we
// implement it.
auto walPath =
folly::stringPrintf("%s/%d/%d/wal", options_.listenerPath_.c_str(), spaceId, partId);
// snapshot manager and client manager is set to nullptr, listener should
// never use them
std::shared_ptr<Listener> listener;
if (type == meta::cpp2::ListenerType::ELASTICSEARCH) {
listener = std::make_shared<ESListener>(
Expand Down
6 changes: 4 additions & 2 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -621,15 +621,17 @@ class NebulaStore : public KVStore, public Handler {
const std::vector<std::string>& files) override;

/**
* @brief Add a space as listener
* @brief Add a specified type listener to space. Perform extra initialization of given type
* listener if necessary. User should call addListenerSpace first then addListenerPart.
*
* @param spaceId
* @param type Listener type
*/
void addListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) override;

/**
* @brief Remove a listener space
* @brief Remove a specified type listener from space. Perform extra destruction of given type
* listener if necessary. User should call removeListenerPart first then removeListenerSpace.
*
* @param spaceId
* @param type Listener type
Expand Down
14 changes: 1 addition & 13 deletions src/kvstore/plugins/elasticsearch/ESListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ class ESListener : public Listener {
* @param ioPool IOThreadPool for listener
* @param workers Background thread for listener
* @param handlers Worker thread for listener
* @param snapshotMan Snapshot manager
* @param clientMan Client manager
* @param diskMan Disk manager
* @param schemaMan Schema manager
*/
ESListener(GraphSpaceID spaceId,
Expand All @@ -40,16 +37,7 @@ class ESListener : public Listener {
std::shared_ptr<thread::GenericThreadPool> workers,
std::shared_ptr<folly::Executor> handlers,
meta::SchemaManager* schemaMan)
: Listener(spaceId,
partId,
std::move(localAddr),
walPath,
ioPool,
workers,
handlers,
nullptr,
nullptr,
nullptr),
: Listener(spaceId, partId, std::move(localAddr), walPath, ioPool, workers, handlers),
schemaMan_(schemaMan) {
CHECK(!!schemaMan);
lastApplyLogFile_ = std::make_unique<std::string>(
Expand Down
11 changes: 1 addition & 10 deletions src/kvstore/test/NebulaListenerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,7 @@ class DummyListener : public Listener {
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<thread::GenericThreadPool> workers,
std::shared_ptr<folly::Executor> handlers)
: Listener(spaceId,
partId,
localAddr,
walPath,
ioPool,
workers,
handlers,
nullptr,
nullptr,
nullptr) {}
: Listener(spaceId, partId, localAddr, walPath, ioPool, workers, handlers) {}

std::vector<KV> data() {
return data_;
Expand Down