diff --git a/src/common/base/CollectNSucceeded.inl b/src/common/base/CollectNSucceeded.inl index ca8d2a71646..89d2f1b9bb7 100644 --- a/src/common/base/CollectNSucceeded.inl +++ b/src/common/base/CollectNSucceeded.inl @@ -16,6 +16,9 @@ folly::Future> collectNSucceeded( size_t n, ResultEval&& eval) { using Result = SucceededResultList; + if (n == 0) { + return folly::Future(Result()); + } struct Context { Context(size_t total, ResultEval&& e) @@ -30,7 +33,6 @@ folly::Future> collectNSucceeded( }; size_t total = size_t(std::distance(first, last)); - DCHECK_GT(n, 0U); DCHECK_GE(total, 0U); if (total < n) { @@ -45,11 +47,11 @@ folly::Future> collectNSucceeded( // for each succeeded Future, add to the result list, until // we have required number of futures, at which point we fulfil // the promise with the result list - for (; first != last; ++first) { - first->setCallback_([n, ctx] ( + for (size_t index = 0; first != last; ++first, ++index) { + first->setCallback_([n, ctx, index] ( folly::Try>&& t) { if (!ctx->promise.isFulfilled()) { - if (!t.hasException() && ctx->eval(t.value())) { + if (!t.hasException() && ctx->eval(index, t.value())) { ctx->results.emplace_back(std::move(t.value())); } if ((++ctx->numCompleted) == ctx->nTotal || diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 147df78b712..d6d8fb29da6 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -22,9 +22,10 @@ namespace raftex { using nebula::network::NetworkUtils; -Host::Host(const HostAddr& addr, std::shared_ptr part) +Host::Host(const HostAddr& addr, std::shared_ptr part, bool isLearner) : part_(std::move(part)) , addr_(addr) + , isLearner_(isLearner) , idStr_(folly::stringPrintf( "[Host: %s:%d] ", NetworkUtils::intToIPv4(addr_.first).c_str(), @@ -39,6 +40,7 @@ void Host::waitForStop() { noMoreRequestCV_.wait(g, [this] { return !requestOnGoing_; }); + LOG(INFO) << idStr_ << "The host has been stopped!"; } @@ -61,6 +63,17 @@ cpp2::ErrorCode Host::checkStatus(std::lock_guard& lck) const { folly::Future Host::askForVote( const cpp2::AskForVoteRequest& req) { + { + std::lock_guard g(lock_); + auto res = checkStatus(g); + if (res != cpp2::ErrorCode::SUCCEEDED) { + VLOG(2) << idStr_ + << "The Host is not in a proper status, do not send"; + cpp2::AskForVoteResponse resp; + resp.set_error_code(res); + return resp; + } + } auto client = tcManager().client(addr_); return client->future_askForVote(req); } diff --git a/src/kvstore/raftex/Host.h b/src/kvstore/raftex/Host.h index c8d4e412499..f27e8906ca6 100644 --- a/src/kvstore/raftex/Host.h +++ b/src/kvstore/raftex/Host.h @@ -25,7 +25,11 @@ class RaftPart; class Host final : public std::enable_shared_from_this { public: - Host(const HostAddr& addr, std::shared_ptr part); + Host(const HostAddr& addr, std::shared_ptr part, bool isLearner = false); + + ~Host() { + LOG(INFO) << idStr_ << " The host has been destroyed!"; + } const char* idStr() const { return idStr_.c_str(); @@ -50,6 +54,10 @@ class Host final : public std::enable_shared_from_this { void waitForStop(); + bool isLearner() const { + return isLearner_; + } + folly::Future askForVote( const cpp2::AskForVoteRequest& req); @@ -84,9 +92,9 @@ class Host final : public std::enable_shared_from_this { private: std::shared_ptr part_; const HostAddr addr_; + bool isLearner_ = false; const std::string idStr_; - mutable std::mutex lock_; bool paused_{false}; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index c1b32aa371d..fd55765cc10 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -205,6 +205,7 @@ RaftPart::~RaftPart() { // Make sure the partition has stopped CHECK(status_ == Status::STOPPED); + LOG(INFO) << idStr_ << " The part has been destroyed..."; } @@ -216,6 +217,8 @@ const char* RaftPart::roleStr(Role role) const { return "Follower"; case Role::CANDIDATE: return "Candidate"; + case Role::LEARNER: + return "Learner"; default: LOG(FATAL) << idStr_ << "Invalid role"; } @@ -223,32 +226,30 @@ const char* RaftPart::roleStr(Role role) const { } -void RaftPart::start(std::vector&& peers) { +void RaftPart::start(std::vector&& peers, bool asLearner) { std::lock_guard g(raftLock_); // Set the quorum number quorum_ = (peers.size() + 1) / 2; - VLOG(2) << idStr_ << "There are " - << peers.size() - << " peer hosts, and total " - << peers.size() + 1 - << " copies. The quorum is " << quorum_ + 1; + LOG(INFO) << idStr_ << "There are " + << peers.size() + << " peer hosts, and total " + << peers.size() + 1 + << " copies. The quorum is " << quorum_ + 1; committedLogId_ = lastCommittedLogId(); // Start all peer hosts - peerHosts_ = std::make_shared< - std::unordered_map> - >(); for (auto& addr : peers) { - peerHosts_->emplace( - addr, - std::make_shared(addr, shared_from_this())); + auto hostPtr = std::make_shared(addr, shared_from_this()); + hosts_.emplace_back(hostPtr); } // Change the status status_ = Status::RUNNING; - + if (asLearner) { + role_ = Role::LEARNER; + } // Set up a leader election task size_t delayMS = 100 + folly::Random::rand32(900); workers_->addDelayTask(delayMS, [self = shared_from_this()] { @@ -260,27 +261,27 @@ void RaftPart::start(std::vector&& peers) { void RaftPart::stop() { VLOG(2) << idStr_ << "Stopping the partition"; - decltype(peerHosts_) hosts; + decltype(hosts_) hosts; { std::unique_lock lck(raftLock_); status_ = Status::STOPPED; - hosts = std::move(peerHosts_); + hosts = std::move(hosts_); } - for (auto& h : *hosts) { - h.second->stop(); + for (auto& h : hosts) { + h->stop(); } VLOG(2) << idStr_ << "Invoked stop() on all peer hosts"; - for (auto& h : *hosts) { - VLOG(2) << idStr_ << "Waiting " << h.second->idStr() << " to stop"; - h.second->waitForStop(); - VLOG(2) << idStr_ << h.second->idStr() << "has stopped"; + for (auto& h : hosts) { + VLOG(2) << idStr_ << "Waiting " << h->idStr() << " to stop"; + h->waitForStop(); + VLOG(2) << idStr_ << h->idStr() << "has stopped"; } VLOG(2) << idStr_ << "All hosts are stopped"; - VLOG(2) << idStr_ << "Partition has been stopped"; + LOG(INFO) << idStr_ << "Partition has been stopped"; } @@ -303,6 +304,10 @@ typename RaftPart::AppendLogResult RaftPart::canAppendLogs( return AppendLogResult::SUCCEEDED; } +void RaftPart::addLearner(HostAddr addr) { + std::lock_guard lck(raftLock_); + hosts_.emplace_back(std::make_shared(addr, shared_from_this(), true)); +} folly::Future RaftPart::appendAsync(ClusterID source, std::string log) { @@ -452,7 +457,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb, LogID prevLogId) { using namespace folly; // NOLINT since the fancy overload of | operator - decltype(peerHosts_) hosts; + decltype(hosts_) hosts; { std::lock_guard g(raftLock_); @@ -476,56 +481,41 @@ void RaftPart::replicateLogs(folly::EventBase* eb, return; } - hosts = peerHosts_; + hosts = hosts_; } VLOG(2) << idStr_ << "About to replicate logs to all peer hosts"; - if (!hosts || hosts->empty()) { - // No peer - VLOG(2) << idStr_ << "The leader has no peer"; - processAppendLogResponses(AppendLogResponses(), - eb, - std::move(iter), - currTerm, - lastLogId, - committedId, - prevLogTerm, - prevLogId); - return; - } - - using PeerHostEntry = typename decltype(peerHosts_)::element_type::value_type; collectNSucceeded( - gen::from(*hosts) + gen::from(hosts) | gen::map([self = shared_from_this(), eb, currTerm, lastLogId, prevLogId, prevLogTerm, - committedId] (PeerHostEntry& host) { + committedId] (std::shared_ptr hostPtr) { VLOG(2) << self->idStr_ << "Appending logs to " - << NetworkUtils::intToIPv4(host.first.first) - << ":" << host.first.second; + << hostPtr->idStr(); return via( eb, [=] () -> Future { - return host.second->appendLogs(eb, - currTerm, - lastLogId, - committedId, - prevLogTerm, - prevLogId); + return hostPtr->appendLogs(eb, + currTerm, + lastLogId, + committedId, + prevLogTerm, + prevLogId); }); }) | gen::as(), // Number of succeeded required quorum_, // Result evaluator - [](cpp2::AppendLogResponse& resp) { - return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED; + [hosts] (size_t index, cpp2::AppendLogResponse& resp) { + return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED + && !hosts[index]->isLearner(); }) .then(eb, [self = shared_from_this(), eb, @@ -534,7 +524,8 @@ void RaftPart::replicateLogs(folly::EventBase* eb, lastLogId, committedId, prevLogId, - prevLogTerm] (folly::Try&& result) mutable { + prevLogTerm, + pHosts = std::move(hosts)] (folly::Try&& result) mutable { VLOG(2) << self->idStr_ << "Received enough response"; CHECK(!result.hasException()); @@ -545,7 +536,8 @@ void RaftPart::replicateLogs(folly::EventBase* eb, lastLogId, committedId, prevLogTerm, - prevLogId); + prevLogId, + std::move(pHosts)); return *result; }); @@ -560,11 +552,13 @@ void RaftPart::processAppendLogResponses( LogID lastLogId, LogID committedId, TermID prevLogTerm, - LogID prevLogId) { + LogID prevLogId, + std::vector> hosts) { // Make sure majority have succeeded size_t numSucceeded = 0; + size_t index = 0; for (auto& res : resps) { - if (res.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { + if (!hosts[index++]->isLearner() && res.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { ++numSucceeded; } } @@ -672,7 +666,7 @@ bool RaftPart::needToStartElection() { bool RaftPart::prepareElectionRequest( cpp2::AskForVoteRequest& req, - std::shared_ptr>>& hosts) { + std::vector>& hosts) { std::lock_guard g(raftLock_); // Make sure the partition is running @@ -695,7 +689,7 @@ bool RaftPart::prepareElectionRequest( req.set_last_log_id(lastLogId_); req.set_last_log_term(lastLogTerm_); - hosts = peerHosts_; + hosts = followers(); return true; } @@ -744,7 +738,7 @@ bool RaftPart::leaderElection() { using namespace folly; // NOLINT since the fancy overload of | operator cpp2::AskForVoteRequest voteReq; - decltype(peerHosts_) hosts; + decltype(hosts_) hosts; if (!prepareElectionRequest(voteReq, hosts)) { return false; } @@ -762,30 +756,28 @@ bool RaftPart::leaderElection() { << ")"; auto resps = ElectionResponses(); - if (!hosts || hosts->empty()) { + if (hosts.empty()) { VLOG(2) << idStr_ << "No peer found, I will be the leader"; } else { auto eb = ioThreadPool_->getEventBase(); auto futures = collectNSucceeded( - gen::from(*hosts) - | gen::map([eb, self = shared_from_this(), &voteReq] ( - decltype(peerHosts_)::element_type::value_type& host) { + gen::from(hosts) + | gen::map([eb, self = shared_from_this(), &voteReq] (auto& host) { VLOG(2) << self->idStr_ << "Sending AskForVoteRequest to " - << NetworkUtils::intToIPv4(host.first.first) - << ":" << host.first.second; + << host->idStr(); return via( eb, [&voteReq, &host] () -> Future { - return host.second->askForVote(voteReq); + return host->askForVote(voteReq); }); }) | gen::as(), // Number of succeeded required quorum_, // Result evaluator - [](cpp2::AskForVoteResponse& resp) { + [](size_t, cpp2::AskForVoteResponse& resp) { return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED; }); @@ -831,6 +823,10 @@ bool RaftPart::leaderElection() { << "No one is elected, continue the election"; return false; } + case Role::LEARNER: { + LOG(FATAL) << idStr_ << " Impossible! There must be some bugs!"; + return false; + } } LOG(FATAL) << "Should not reach here"; @@ -1130,6 +1126,7 @@ cpp2::ErrorCode RaftPart::verifyLeader( VLOG(2) << idStr_ << "The current role is " << roleStr(role_); UNUSED(lck); switch (role_) { + case Role::LEARNER: case Role::FOLLOWER: { if (req.get_current_term() == term_ && req.get_leader_ip() == leader_.first && @@ -1166,7 +1163,9 @@ cpp2::ErrorCode RaftPart::verifyLeader( << ":" << req.get_leader_port() << " [Term: " << req.get_current_term() << "]"; - role_ = Role::FOLLOWER; + if (role_ != Role::LEARNER) { + role_ = Role::FOLLOWER; + } leader_ = std::make_pair(req.get_leader_ip(), req.get_leader_port()); term_ = proposedTerm_ = req.get_current_term(); @@ -1185,7 +1184,7 @@ folly::Future RaftPart::sendHeartbeat() { TermID lastLogTerm = 0; LogID committed = 0; - decltype(peerHosts_) hosts; + decltype(hosts_) hosts; { std::lock_guard g(raftLock_); @@ -1219,41 +1218,33 @@ folly::Future RaftPart::sendHeartbeat() { lastLogTerm = lastLogTerm_; committed = committedLogId_; - hosts = peerHosts_; - } - - if (!hosts || hosts->empty()) { - VLOG(2) << idStr_ << "No peer to send the heartbeat"; - doneHeartbeat(); - return AppendLogResult::SUCCEEDED; + hosts = hosts_; } auto eb = ioThreadPool_->getEventBase(); - using PeerHostEntry = typename decltype(peerHosts_)::element_type::value_type; return collectNSucceeded( - gen::from(*hosts) - | gen::map([=, self = shared_from_this()] (PeerHostEntry& host) { + gen::from(hosts) + | gen::map([=, self = shared_from_this()] (std::shared_ptr host) { VLOG(2) << self->idStr_ << "Send a heartbeat to " - << NetworkUtils::intToIPv4(host.first.first) - << ":" << host.first.second; + << host->idStr(); return via( eb, - [=, &host] () -> Future { - return host.second->appendLogs(eb, - term, - lastLogId, - committed, - lastLogTerm, - lastLogId); + [=] () -> Future { + return host->appendLogs(eb, + term, + lastLogId, + committed, + lastLogTerm, + lastLogId); }); }) | gen::as(), // Number of succeeded required quorum_, // Result evaluator - [](cpp2::AppendLogResponse& resp) { + [](size_t, cpp2::AppendLogResponse& resp) { return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED; }) .then([=, self = shared_from_this()] ( @@ -1301,6 +1292,18 @@ void RaftPart::doneHeartbeat() { } } +std::vector> RaftPart::followers() const { + CHECK(!raftLock_.try_lock()); + decltype(hosts_) hosts; + for (auto& h : hosts_) { + if (!h->isLearner()) { + hosts.emplace_back(h); + } + } + return hosts; +} + + } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 2fe5f0e755c..07f12542edd 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -93,9 +93,11 @@ class RaftPart : public std::enable_shared_from_this { return wal_; } + void addLearner(HostAddr learner); + // Change the partition status to RUNNING. This is called // by the inherited class, when it's ready to server - virtual void start(std::vector&& peers); + virtual void start(std::vector&& peers, bool asLearner = false); // Change the partition status to STOPPED. This is called // by the inherited class, when it's about to stop @@ -198,7 +200,9 @@ class RaftPart : public std::enable_shared_from_this { enum class Role { LEADER = 1, // the leader FOLLOWER, // following a leader - CANDIDATE // Has sent AskForVote request + CANDIDATE, // Has sent AskForVote request + LEARNER // It is the same with FOLLOWER, + // except it does not participate in leader election }; // A list of @@ -257,7 +261,7 @@ class RaftPart : public std::enable_shared_from_this { // return FALSE bool prepareElectionRequest( cpp2::AskForVoteRequest& req, - std::shared_ptr>>& hosts); + std::vector>& hosts); // The method returns the partition's role after the election Role processElectionResponses(const ElectionResponses& results); @@ -289,8 +293,10 @@ class RaftPart : public std::enable_shared_from_this { LogID lastLogId, LogID committedId, TermID prevLogTerm, - LogID prevLogId); + LogID prevLogId, + std::vector> hosts); + std::vector> followers() const; private: template @@ -368,8 +374,7 @@ class RaftPart : public std::enable_shared_from_this { const GraphSpaceID spaceId_; const PartitionID partId_; const HostAddr addr_; - std::shared_ptr>> - peerHosts_; + std::vector> hosts_; size_t quorum_{0}; // Partition level lock to synchronize the access of the partition diff --git a/src/kvstore/raftex/RaftexService.cpp b/src/kvstore/raftex/RaftexService.cpp index 64182861ac8..1630a4574f1 100644 --- a/src/kvstore/raftex/RaftexService.cpp +++ b/src/kvstore/raftex/RaftexService.cpp @@ -32,10 +32,9 @@ std::shared_ptr RaftexService::createService( } svc->serverThread_.reset(new std::thread([svc] { - LOG(INFO) << "Starting the Raftex Service"; - svc->server_->setup(); svc->serverPort_ = svc->server_->getAddress().getPort(); + LOG(INFO) << "Starting the Raftex Service on " << svc->serverPort_; SCOPE_EXIT { svc->server_->cleanUp(); }; diff --git a/src/kvstore/raftex/test/CMakeLists.txt b/src/kvstore/raftex/test/CMakeLists.txt index c4a843a82b9..b108899b372 100644 --- a/src/kvstore/raftex/test/CMakeLists.txt +++ b/src/kvstore/raftex/test/CMakeLists.txt @@ -69,3 +69,26 @@ nebula_link_libraries( ) nebula_add_test(log_cas_test) +add_executable( + learner_test + LearnerTest.cpp + RaftexTestBase.cpp + TestShard.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +nebula_link_libraries( + learner_test + ${THRIFT_LIBRARIES} + wangle + gtest +) +nebula_add_test(learner_test) + diff --git a/src/kvstore/raftex/test/LearnerTest.cpp b/src/kvstore/raftex/test/LearnerTest.cpp new file mode 100644 index 00000000000..30582bfcea0 --- /dev/null +++ b/src/kvstore/raftex/test/LearnerTest.cpp @@ -0,0 +1,120 @@ +/* Copyright (c) 2018 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "fs/FileUtils.h" +#include "thread/GenericThreadPool.h" +#include "network/NetworkUtils.h" +#include "kvstore/wal/BufferFlusher.h" +#include "kvstore/raftex/RaftexService.h" +#include "kvstore/raftex/test/RaftexTestBase.h" +#include "kvstore/raftex/test/TestShard.h" + +DECLARE_uint32(heartbeat_interval); + + +namespace nebula { +namespace raftex { + +TEST(LearnerTest, SimpleTest) { + fs::TempDir walRoot("/tmp/learner_test.XXXXXX"); + std::shared_ptr workers; + std::vector wals; + std::vector allHosts; + std::vector> services; + std::vector> copies; + + std::shared_ptr leader; + std::vector isLearner = {false, true, true}; + // Start three services, the first one will be the leader, the left two will be learners. + setupRaft(3, walRoot, workers, wals, allHosts, services, copies, leader, isLearner); + + // The copies[0] is the leader. + checkLeadership(copies, 0, leader); + + + // Append 100 logs + LOG(INFO) << "=====> Start appending logs"; + std::vector msgs; + for (int i = 1; i <= 100; ++i) { + msgs.emplace_back( + folly::stringPrintf("Test Log Message %03d", i)); + auto fut = leader->appendAsync(0, msgs.back()); + ASSERT_EQ(RaftPart::AppendLogResult::SUCCEEDED, + std::move(fut).get()); + } + LOG(INFO) << "<===== Finish appending logs"; + + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (auto& c : copies) { + ASSERT_EQ(100, c->getNumLogs()); + } + + LogID id = 1; + for (int i = 0; i < 100; ++i, ++id) { + for (auto& c : copies) { + folly::StringPiece msg; + ASSERT_TRUE(c->getLogMsg(id, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + } + + LOG(INFO) << "Let's kill the two learners, the leader should still work"; + for (auto i = 1; i < 3; i++) { + services[i]->removePartition(copies[i]); + } + + checkLeadership(copies, 0, leader); + LOG(INFO) << "=====> Start appending logs again"; + for (int i = 101; i <= 200; ++i) { + msgs.emplace_back( + folly::stringPrintf("Test Log Message %03d", i)); + auto fut = leader->appendAsync(0, msgs.back()); + ASSERT_EQ(RaftPart::AppendLogResult::SUCCEEDED, + std::move(fut).get()); + } + LOG(INFO) << "<===== Finish appending logs"; + + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval); + + // Check the leader + ASSERT_EQ(200, leader->getNumLogs()); + + id = 101; + for (int i = 101; i < 200; ++i, ++id) { + folly::StringPiece msg; + ASSERT_TRUE(leader->getLogMsg(id, msg)); + ASSERT_EQ(msgs[i - 1], msg.toString()); + } + finishRaft(services, copies, workers, leader); +} + +} // namespace raftex +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + // `flusher' is extern-declared in RaftexTestBase.h, defined in RaftexTestBase.cpp + using nebula::raftex::flusher; + flusher = std::make_unique(); + + return RUN_ALL_TESTS(); +} + + diff --git a/src/kvstore/raftex/test/RaftexTestBase.cpp b/src/kvstore/raftex/test/RaftexTestBase.cpp index bcd8ad0ca0e..441d0eb6d01 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.cpp +++ b/src/kvstore/raftex/test/RaftexTestBase.cpp @@ -25,15 +25,21 @@ std::condition_variable leaderCV; std::vector getPeers(const std::vector& all, - const HostAddr& self) { + const HostAddr& self, + std::vector isLearner) { + if (isLearner.empty()) { + isLearner.resize(all.size(), false); + } std::vector peers; + size_t index = 0; for (const auto& host : all) { - if (host != self) { + if (host != self && !isLearner[index]) { VLOG(2) << "Adding host " << NetworkUtils::intToIPv4(host.first) << ":" << host.second; peers.emplace_back(host); } + index++; } return std::move(peers); @@ -117,7 +123,8 @@ void setupRaft( std::vector& allHosts, std::vector>& services, std::vector>& copies, - std::shared_ptr& leader) { + std::shared_ptr& leader, + std::vector isLearner) { uint32_t ipInt; CHECK(NetworkUtils::ipv4ToInt("127.0.0.1", ipInt)); @@ -141,6 +148,9 @@ void setupRaft( allHosts.emplace_back(ipInt, port); } + if (isLearner.empty()) { + isLearner.resize(allHosts.size(), false); + } // Create one copy of the shard for each service for (size_t i = 0; i < services.size(); i++) { copies.emplace_back(std::make_shared( @@ -165,7 +175,15 @@ void setupRaft( std::placeholders::_2, std::placeholders::_3))); services[i]->addPartition(copies.back()); - copies.back()->start(getPeers(allHosts, allHosts[i])); + copies.back()->start(getPeers(allHosts, allHosts[i], isLearner), + isLearner[i]); + if (!isLearner[i]) { + for (size_t j = 0; j < isLearner.size(); j++) { + if (j != i && isLearner[j]) { + copies.back()->addLearner(allHosts[j]); + } + } + } } // Wait untill all copies agree on the same leader @@ -184,8 +202,10 @@ void finishRaft(std::vector>& services, for (auto& svc : services) { svc->stop(); } + LOG(INFO) << "Stopping workers..."; workers->stop(); workers->wait(); + LOG(INFO) << "Waiting for all service stopped"; for (auto& svc : services) { svc->waitUntilStop(); } @@ -204,6 +224,18 @@ void checkLeadership(std::vector>& copies, } } +/** + * Check copies[index] is the leader. + * */ +void checkLeadership(std::vector>& copies, + size_t index, + std::shared_ptr& leader) { + std::lock_guard lock(leaderMutex); + ASSERT_FALSE(!leader); + ASSERT_EQ(leader->address(), copies[index]->address()); +} + + } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/test/RaftexTestBase.h b/src/kvstore/raftex/test/RaftexTestBase.h index a9664ae8c4b..98ea33b955b 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.h +++ b/src/kvstore/raftex/test/RaftexTestBase.h @@ -38,7 +38,8 @@ extern std::condition_variable leaderCV; std::vector getPeers(const std::vector& all, - const HostAddr& self); + const HostAddr& self, + std::vector isLearner = {}); void onLeaderElected( std::vector>& copies, @@ -66,7 +67,8 @@ void setupRaft( std::vector& allHosts, std::vector>& services, std::vector>& copies, - std::shared_ptr& leader); + std::shared_ptr& leader, + std::vector isLearner = {}); void finishRaft(std::vector>& services, std::vector>& copies, @@ -76,6 +78,9 @@ void finishRaft(std::vector>& services, void checkLeadership(std::vector>& copies, std::shared_ptr& leader); +void checkLeadership(std::vector>& copies, + size_t index, + std::shared_ptr& leader); class RaftexTestFixture : public ::testing::Test { public: