diff --git a/src/common/base/CollectNSucceeded.h b/src/common/base/CollectNSucceeded.h index 8fd2fff33fe..e6ff2053ec3 100644 --- a/src/common/base/CollectNSucceeded.h +++ b/src/common/base/CollectNSucceeded.h @@ -49,8 +49,10 @@ template using FutureReturnType = typename std::iterator_traits::value_type::value_type; +// The result pair list. +// The first element in pair represents the index in Requests list. template -using SucceededResultList = std::vector>; +using SucceededResultList = std::vector>>; template diff --git a/src/common/base/CollectNSucceeded.inl b/src/common/base/CollectNSucceeded.inl index ca8d2a71646..bad898f9a23 100644 --- a/src/common/base/CollectNSucceeded.inl +++ b/src/common/base/CollectNSucceeded.inl @@ -16,6 +16,9 @@ folly::Future> collectNSucceeded( size_t n, ResultEval&& eval) { using Result = SucceededResultList; + if (n == 0) { + return folly::Future(Result()); + } struct Context { Context(size_t total, ResultEval&& e) @@ -30,7 +33,6 @@ folly::Future> collectNSucceeded( }; size_t total = size_t(std::distance(first, last)); - DCHECK_GT(n, 0U); DCHECK_GE(total, 0U); if (total < n) { @@ -45,12 +47,12 @@ folly::Future> collectNSucceeded( // for each succeeded Future, add to the result list, until // we have required number of futures, at which point we fulfil // the promise with the result list - for (; first != last; ++first) { - first->setCallback_([n, ctx] ( + for (size_t index = 0; first != last; ++first, ++index) { + first->setCallback_([n, ctx, index] ( folly::Try>&& t) { if (!ctx->promise.isFulfilled()) { - if (!t.hasException() && ctx->eval(t.value())) { - ctx->results.emplace_back(std::move(t.value())); + if (!t.hasException() && ctx->eval(index, t.value())) { + ctx->results.emplace_back(index, std::move(t.value())); } if ((++ctx->numCompleted) == ctx->nTotal || ctx->results.size() == n) { diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index 80059ed65a1..a413d9d395f 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -50,7 +50,7 @@ struct StoreCapability { }; #define SUPPORT_FILTERING(store) (store.capability() & StoreCapability::SC_FILTERING) - +class Part; /** * Interface for all kv-stores **/ @@ -136,6 +136,9 @@ class KVStore { const std::string& prefix, KVCallback cb) = 0; + virtual ErrorOr> part(GraphSpaceID spaceId, + PartitionID partId) = 0; + protected: KVStore() = default; }; diff --git a/src/kvstore/LogEncoder.cpp b/src/kvstore/LogEncoder.cpp index 8727a30d345..450f5f55257 100644 --- a/src/kvstore/LogEncoder.cpp +++ b/src/kvstore/LogEncoder.cpp @@ -144,6 +144,29 @@ std::vector decodeMultiValues(folly::StringPiece encoded) { return values; } +std::string encodeLearner(const HostAddr& learner) { + std::string encoded; + encoded.reserve(kHeadLen + sizeof(HostAddr)); + // Timestamp (8 bytes) + int64_t ts = time::WallClock::fastNowInMilliSec(); + encoded.append(reinterpret_cast(&ts), sizeof(int64_t)); + // Log type + auto type = LogType::OP_ADD_LEARNER; + encoded.append(reinterpret_cast(&type), 1); + encoded.append(reinterpret_cast(&learner), sizeof(HostAddr)); + return encoded; +} + +HostAddr decodeLearner(const std::string& encoded) { + HostAddr addr; + CHECK_EQ(kHeadLen + sizeof(HostAddr), encoded.size()); + memcpy(&addr.first, encoded.data() + kHeadLen, sizeof(addr.first)); + memcpy(&addr.second, + encoded.data() + kHeadLen + sizeof(addr.first), + sizeof(addr.second)); + return addr; +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/LogEncoder.h b/src/kvstore/LogEncoder.h index d96a621042b..4d656770b2c 100644 --- a/src/kvstore/LogEncoder.h +++ b/src/kvstore/LogEncoder.h @@ -19,6 +19,7 @@ enum LogType : char { OP_MULTI_REMOVE = 0x4, OP_REMOVE_PREFIX = 0x5, OP_REMOVE_RANGE = 0x6, + OP_ADD_LEARNER = 0x07, }; @@ -32,6 +33,9 @@ std::string encodeMultiValues(LogType type, folly::StringPiece v2); std::vector decodeMultiValues(folly::StringPiece encoded); +std::string encodeLearner(const HostAddr& learner); +HostAddr decodeLearner(const std::string& encoded); + } // namespace kvstore } // namespace nebula #endif // KVSTORE_LOGENCODER_H_ diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index dd5b1391bfa..86b93311ba0 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -339,9 +339,13 @@ void NebulaStore::asyncMultiPut(GraphSpaceID spaceId, PartitionID partId, std::vector keyValues, KVCallback cb) { - folly::RWSpinLock::ReadHolder rh(&lock_); - CHECK_FOR_WRITE(spaceId, partId, cb); - return partIt->second->asyncMultiPut(std::move(keyValues), std::move(cb)); + auto ret = part(spaceId, partId); + if (!ok(ret)) { + cb(error(ret)); + return; + } + auto part = nebula::value(ret); + return part->asyncMultiPut(std::move(keyValues), std::move(cb)); } @@ -349,9 +353,13 @@ void NebulaStore::asyncRemove(GraphSpaceID spaceId, PartitionID partId, const std::string& key, KVCallback cb) { - folly::RWSpinLock::ReadHolder rh(&lock_); - CHECK_FOR_WRITE(spaceId, partId, cb); - return partIt->second->asyncRemove(key, std::move(cb)); + auto ret = part(spaceId, partId); + if (!ok(ret)) { + cb(error(ret)); + return; + } + auto part = nebula::value(ret); + return part->asyncRemove(key, std::move(cb)); } @@ -359,9 +367,13 @@ void NebulaStore::asyncMultiRemove(GraphSpaceID spaceId, PartitionID partId, std::vector keys, KVCallback cb) { - folly::RWSpinLock::ReadHolder rh(&lock_); - CHECK_FOR_WRITE(spaceId, partId, cb); - return partIt->second->asyncMultiRemove(std::move(keys), std::move(cb)); + auto ret = part(spaceId, partId); + if (!ok(ret)) { + cb(error(ret)); + return; + } + auto part = nebula::value(ret); + return part->asyncMultiRemove(std::move(keys), std::move(cb)); } @@ -370,9 +382,13 @@ void NebulaStore::asyncRemoveRange(GraphSpaceID spaceId, const std::string& start, const std::string& end, KVCallback cb) { - folly::RWSpinLock::ReadHolder rh(&lock_); - CHECK_FOR_WRITE(spaceId, partId, cb); - return partIt->second->asyncRemoveRange(start, end, std::move(cb)); + auto ret = part(spaceId, partId); + if (!ok(ret)) { + cb(error(ret)); + return; + } + auto part = nebula::value(ret); + return part->asyncRemoveRange(start, end, std::move(cb)); } @@ -380,9 +396,28 @@ void NebulaStore::asyncRemovePrefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix, KVCallback cb) { + auto ret = part(spaceId, partId); + if (!ok(ret)) { + cb(error(ret)); + return; + } + auto part = nebula::value(ret); + return part->asyncRemovePrefix(prefix, std::move(cb)); +} + +ErrorOr> NebulaStore::part(GraphSpaceID spaceId, + PartitionID partId) { folly::RWSpinLock::ReadHolder rh(&lock_); - CHECK_FOR_WRITE(spaceId, partId, cb); - return partIt->second->asyncRemovePrefix(prefix, std::move(cb)); + auto it = spaces_.find(spaceId); + if (UNLIKELY(it == spaces_.end())) { + return ResultCode::ERR_SPACE_NOT_FOUND; + } + auto& parts = it->second->parts_; + auto partIt = parts.find(partId); + if (UNLIKELY(partIt == parts.end())) { + return ResultCode::ERR_PART_NOT_FOUND; + } + return partIt->second; } diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 2df871fe0bf..62207157153 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -139,6 +139,9 @@ class NebulaStore : public KVStore, public Handler { const std::string& prefix, KVCallback cb) override; + ErrorOr> part(GraphSpaceID spaceId, + PartitionID partId) override; + ResultCode ingest(GraphSpaceID spaceId, const std::string& extra, const std::vector& files); diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 586dbaf92cf..693cd262f4a 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -132,6 +132,13 @@ void Part::asyncRemoveRange(folly::StringPiece start, }); } +void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) { + std::string log = encodeLearner(learner); + sendCommandAsync(std::move(log)) + .then([callback = std::move(cb)] (AppendLogResult res) mutable { + callback(toResultCode(res)); + }); +} void Part::onLostLeadership(TermID term) { VLOG(1) << "Lost the leadership for the term " << term; @@ -219,6 +226,9 @@ bool Part::commitLogs(std::unique_ptr iter) { } break; } + case OP_ADD_LEARNER: { + break; + } default: { LOG(FATAL) << "Unknown operation: " << static_cast(log[0]); } @@ -243,6 +253,19 @@ bool Part::preProcessLog(LogID logId, << ", termId " << termId << ", clusterId " << clusterId << ", log " << log; + if (!log.empty()) { + switch (log[sizeof(int64_t)]) { + case OP_ADD_LEARNER: { + auto learner = decodeLearner(log); + addLearner(learner); + LOG(INFO) << idStr_ << "Add learner " << learner; + break; + } + default: { + break; + } + } + } return true; } diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index 1a4ee5d7b6e..c4b4e4d2339 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -44,6 +44,7 @@ class Part : public raftex::RaftPart { folly::StringPiece end, KVCallback cb); + void asyncAddLearner(const HostAddr& learner, KVCallback cb); /** * Methods inherited from RaftPart */ diff --git a/src/kvstore/plugins/hbase/HBaseStore.h b/src/kvstore/plugins/hbase/HBaseStore.h index 171ebd47078..d480cc8b1c9 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.h +++ b/src/kvstore/plugins/hbase/HBaseStore.h @@ -126,6 +126,11 @@ class HBaseStore : public KVStore { const std::string& prefix, KVCallback cb) override; + ErrorOr> part(GraphSpaceID, + PartitionID) override { + LOG(FATAL) << "Unsupported!"; + } + private: std::string getRowKey(const std::string& key) { return key.substr(sizeof(PartitionID), key.size() - sizeof(PartitionID)); diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index a5877fb29ce..361b6123ae9 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -22,9 +22,10 @@ namespace raftex { using nebula::network::NetworkUtils; -Host::Host(const HostAddr& addr, std::shared_ptr part) +Host::Host(const HostAddr& addr, std::shared_ptr part, bool isLearner) : part_(std::move(part)) , addr_(addr) + , isLearner_(isLearner) , idStr_(folly::stringPrintf( "[Host: %s:%d] ", NetworkUtils::intToIPv4(addr_.first).c_str(), @@ -39,6 +40,7 @@ void Host::waitForStop() { noMoreRequestCV_.wait(g, [this] { return !requestOnGoing_; }); + LOG(INFO) << idStr_ << "The host has been stopped!"; } @@ -61,6 +63,17 @@ cpp2::ErrorCode Host::checkStatus(std::lock_guard& lck) const { folly::Future Host::askForVote( const cpp2::AskForVoteRequest& req) { + { + std::lock_guard g(lock_); + auto res = checkStatus(g); + if (res != cpp2::ErrorCode::SUCCEEDED) { + VLOG(2) << idStr_ + << "The Host is not in a proper status, do not send"; + cpp2::AskForVoteResponse resp; + resp.set_error_code(res); + return resp; + } + } auto client = tcManager().client(addr_); return client->future_askForVote(req); } @@ -193,6 +206,12 @@ folly::Future Host::appendLogsInternal( } cpp2::AppendLogResponse resp = std::move(t).value(); + VLOG(3) << self->idStr_ << "AppendLogResponse " + << "code " << static_cast(resp.get_error_code()) + << ", currTerm " << resp.get_current_term() + << ", lastLogId " << resp.get_last_log_id() + << ", lastLogTerm " << resp.get_last_log_term() + << ", commitLogId " << resp.get_committed_log_id(); switch (resp.get_error_code()) { case cpp2::ErrorCode::SUCCEEDED: { VLOG(2) << self->idStr_ @@ -378,6 +397,9 @@ folly::Future Host::sendAppendLogRequest( } } + VLOG(3) << idStr_ << "sendAppendLogRequest, lastLogId " << req->get_last_log_id() + << ", lastCommittedLogId " << req->get_committed_log_id() + << ", lastLogIdSend " << req->get_last_log_id_sent(); // Get client connection auto client = tcManager().client(addr_); return client->future_appendLog(*req); diff --git a/src/kvstore/raftex/Host.h b/src/kvstore/raftex/Host.h index c8d4e412499..99d04cd844b 100644 --- a/src/kvstore/raftex/Host.h +++ b/src/kvstore/raftex/Host.h @@ -25,7 +25,11 @@ class RaftPart; class Host final : public std::enable_shared_from_this { public: - Host(const HostAddr& addr, std::shared_ptr part); + Host(const HostAddr& addr, std::shared_ptr part, bool isLearner = false); + + ~Host() { + LOG(INFO) << idStr_ << " The host has been destroyed!"; + } const char* idStr() const { return idStr_.c_str(); @@ -50,6 +54,10 @@ class Host final : public std::enable_shared_from_this { void waitForStop(); + bool isLearner() const { + return isLearner_; + } + folly::Future askForVote( const cpp2::AskForVoteRequest& req); @@ -62,6 +70,9 @@ class Host final : public std::enable_shared_from_this { TermID lastLogTermSent, // The last log term being sent LogID lastLogIdSent); // The last log id being sent + const HostAddr& address() const { + return addr_; + } private: cpp2::ErrorCode checkStatus(std::lock_guard& lck) const; @@ -84,9 +95,9 @@ class Host final : public std::enable_shared_from_this { private: std::shared_ptr part_; const HostAddr addr_; + bool isLearner_ = false; const std::string idStr_; - mutable std::mutex lock_; bool paused_{false}; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index fb7f5954d74..2533b26469f 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -230,7 +230,7 @@ RaftPart::~RaftPart() { // Make sure the partition has stopped CHECK(status_ == Status::STOPPED); - LOG(INFO) << idStr_ << "~RaftPart()"; + LOG(INFO) << idStr_ << " The part has been destroyed..."; } @@ -242,6 +242,8 @@ const char* RaftPart::roleStr(Role role) const { return "Follower"; case Role::CANDIDATE: return "Candidate"; + case Role::LEARNER: + return "Learner"; default: LOG(FATAL) << idStr_ << "Invalid role"; } @@ -249,32 +251,31 @@ const char* RaftPart::roleStr(Role role) const { } -void RaftPart::start(std::vector&& peers) { +void RaftPart::start(std::vector&& peers, bool asLearner) { std::lock_guard g(raftLock_); // Set the quorum number quorum_ = (peers.size() + 1) / 2; - VLOG(2) << idStr_ << "There are " - << peers.size() - << " peer hosts, and total " - << peers.size() + 1 - << " copies. The quorum is " << quorum_ + 1; + LOG(INFO) << idStr_ << "There are " + << peers.size() + << " peer hosts, and total " + << peers.size() + 1 + << " copies. The quorum is " << quorum_ + 1 + << ", as learner " << asLearner; committedLogId_ = lastCommittedLogId(); // Start all peer hosts - peerHosts_ = std::make_shared< - std::unordered_map> - >(); for (auto& addr : peers) { - peerHosts_->emplace( - addr, - std::make_shared(addr, shared_from_this())); + auto hostPtr = std::make_shared(addr, shared_from_this()); + hosts_.emplace_back(hostPtr); } // Change the status status_ = Status::RUNNING; - + if (asLearner) { + role_ = Role::LEARNER; + } // Set up a leader election task size_t delayMS = 100 + folly::Random::rand32(900); workers_->addDelayTask(delayMS, [self = shared_from_this()] { @@ -286,28 +287,26 @@ void RaftPart::start(std::vector&& peers) { void RaftPart::stop() { VLOG(2) << idStr_ << "Stopping the partition"; - decltype(peerHosts_) hosts; + decltype(hosts_) hosts; { std::unique_lock lck(raftLock_); status_ = Status::STOPPED; - hosts = std::move(peerHosts_); + hosts = std::move(hosts_); } - if (hosts) { - for (auto& h : *hosts) { - h.second->stop(); - } + for (auto& h : hosts) { + h->stop(); + } - VLOG(2) << idStr_ << "Invoked stop() on all peer hosts"; + VLOG(2) << idStr_ << "Invoked stop() on all peer hosts"; - for (auto& h : *hosts) { - VLOG(2) << idStr_ << "Waiting " << h.second->idStr() << " to stop"; - h.second->waitForStop(); - VLOG(2) << idStr_ << h.second->idStr() << "has stopped"; - } - VLOG(2) << idStr_ << "All hosts are stopped"; + for (auto& h : hosts) { + VLOG(2) << idStr_ << "Waiting " << h->idStr() << " to stop"; + h->waitForStop(); + VLOG(2) << idStr_ << h->idStr() << "has stopped"; } + hosts.clear(); LOG(INFO) << idStr_ << "Partition has been stopped"; } @@ -330,6 +329,23 @@ AppendLogResult RaftPart::canAppendLogs() { return AppendLogResult::SUCCEEDED; } +void RaftPart::addLearner(const HostAddr& addr) { + CHECK(!raftLock_.try_lock()); + if (addr == addr_) { + LOG(INFO) << idStr_ << "I am learner!"; + return; + } + auto it = std::find_if(hosts_.begin(), hosts_.end(), [&addr] (const auto& h) { + return h->address() == addr; + }); + if (it == hosts_.end()) { + hosts_.emplace_back(std::make_shared(addr, shared_from_this(), true)); + LOG(INFO) << idStr_ << "Add learner " << addr; + } else { + LOG(INFO) << idStr_ << "The host " << addr << " has been existed as " + << ((*it)->isLearner() ? " learner " : " group member"); + } +} folly::Future RaftPart::appendAsync(ClusterID source, std::string log) { @@ -512,7 +528,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb, LogID prevLogId) { using namespace folly; // NOLINT since the fancy overload of | operator - decltype(peerHosts_) hosts; + decltype(hosts_) hosts; { std::lock_guard g(raftLock_); @@ -532,56 +548,41 @@ void RaftPart::replicateLogs(folly::EventBase* eb, return; } - hosts = peerHosts_; + hosts = hosts_; } VLOG(2) << idStr_ << "About to replicate logs to all peer hosts"; - if (!hosts || hosts->empty()) { - // No peer - VLOG(2) << idStr_ << "The leader has no peer"; - processAppendLogResponses(AppendLogResponses(), - eb, - std::move(iter), - currTerm, - lastLogId, - committedId, - prevLogTerm, - prevLogId); - return; - } - - using PeerHostEntry = typename decltype(peerHosts_)::element_type::value_type; collectNSucceeded( - gen::from(*hosts) + gen::from(hosts) | gen::map([self = shared_from_this(), eb, currTerm, lastLogId, prevLogId, prevLogTerm, - committedId] (PeerHostEntry& host) { + committedId] (std::shared_ptr hostPtr) { VLOG(2) << self->idStr_ << "Appending logs to " - << NetworkUtils::intToIPv4(host.first.first) - << ":" << host.first.second; + << hostPtr->idStr(); return via( eb, [=] () -> Future { - return host.second->appendLogs(eb, - currTerm, - lastLogId, - committedId, - prevLogTerm, - prevLogId); + return hostPtr->appendLogs(eb, + currTerm, + lastLogId, + committedId, + prevLogTerm, + prevLogId); }); }) | gen::as(), // Number of succeeded required quorum_, // Result evaluator - [](cpp2::AppendLogResponse& resp) { - return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED; + [hosts] (size_t index, cpp2::AppendLogResponse& resp) { + return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED + && !hosts[index]->isLearner(); }) .then(eb, [self = shared_from_this(), eb, @@ -590,7 +591,8 @@ void RaftPart::replicateLogs(folly::EventBase* eb, lastLogId, committedId, prevLogId, - prevLogTerm] (folly::Try&& result) mutable { + prevLogTerm, + pHosts = std::move(hosts)] (folly::Try&& result) mutable { VLOG(2) << self->idStr_ << "Received enough response"; CHECK(!result.hasException()); @@ -601,7 +603,8 @@ void RaftPart::replicateLogs(folly::EventBase* eb, lastLogId, committedId, prevLogTerm, - prevLogId); + prevLogId, + std::move(pHosts)); return *result; }); @@ -616,11 +619,13 @@ void RaftPart::processAppendLogResponses( LogID lastLogId, LogID committedId, TermID prevLogTerm, - LogID prevLogId) { + LogID prevLogId, + std::vector> hosts) { // Make sure majority have succeeded size_t numSucceeded = 0; for (auto& res : resps) { - if (res.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { + if (!hosts[res.first]->isLearner() + && res.second.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { ++numSucceeded; } } @@ -640,7 +645,6 @@ void RaftPart::processAppendLogResponses( replicatingLogs_ = false; return; } - if (role_ != Role::LEADER) { // Is not a leader any more VLOG(2) << idStr_ << "The leader has changed"; @@ -747,7 +751,7 @@ bool RaftPart::needToStartElection() { bool RaftPart::prepareElectionRequest( cpp2::AskForVoteRequest& req, - std::shared_ptr>>& hosts) { + std::vector>& hosts) { std::lock_guard g(raftLock_); // Make sure the partition is running @@ -770,7 +774,7 @@ bool RaftPart::prepareElectionRequest( req.set_last_log_id(lastLogId_); req.set_last_log_term(lastLogTerm_); - hosts = peerHosts_; + hosts = followers(); return true; } @@ -795,7 +799,7 @@ typename RaftPart::Role RaftPart::processElectionResponses( size_t numSucceeded = 0; for (auto& r : results) { - if (r.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { + if (r.second.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { ++numSucceeded; } } @@ -819,7 +823,7 @@ bool RaftPart::leaderElection() { using namespace folly; // NOLINT since the fancy overload of | operator cpp2::AskForVoteRequest voteReq; - decltype(peerHosts_) hosts; + decltype(hosts_) hosts; if (!prepareElectionRequest(voteReq, hosts)) { return false; } @@ -837,30 +841,28 @@ bool RaftPart::leaderElection() { << ")"; auto resps = ElectionResponses(); - if (!hosts || hosts->empty()) { + if (hosts.empty()) { VLOG(2) << idStr_ << "No peer found, I will be the leader"; } else { auto eb = ioThreadPool_->getEventBase(); auto futures = collectNSucceeded( - gen::from(*hosts) - | gen::map([eb, self = shared_from_this(), &voteReq] ( - decltype(peerHosts_)::element_type::value_type& host) { + gen::from(hosts) + | gen::map([eb, self = shared_from_this(), &voteReq] (auto& host) { VLOG(2) << self->idStr_ << "Sending AskForVoteRequest to " - << NetworkUtils::intToIPv4(host.first.first) - << ":" << host.first.second; + << host->idStr(); return via( eb, [&voteReq, &host] () -> Future { - return host.second->askForVote(voteReq); + return host->askForVote(voteReq); }); }) | gen::as(), // Number of succeeded required quorum_, // Result evaluator - [](cpp2::AskForVoteResponse& resp) { + [](size_t, cpp2::AskForVoteResponse& resp) { return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED; }); @@ -907,6 +909,10 @@ bool RaftPart::leaderElection() { << "No one is elected, continue the election"; return false; } + case Role::LEARNER: { + LOG(FATAL) << idStr_ << " Impossible! There must be some bugs!"; + return false; + } } LOG(FATAL) << "Should not reach here"; @@ -1198,6 +1204,7 @@ cpp2::ErrorCode RaftPart::verifyLeader( VLOG(2) << idStr_ << "The current role is " << roleStr(role_); UNUSED(lck); switch (role_) { + case Role::LEARNER: case Role::FOLLOWER: { if (req.get_current_term() == term_ && req.get_leader_ip() == leader_.first && @@ -1234,7 +1241,9 @@ cpp2::ErrorCode RaftPart::verifyLeader( << ":" << req.get_leader_port() << " [Term: " << req.get_current_term() << "]"; - role_ = Role::FOLLOWER; + if (role_ != Role::LEARNER) { + role_ = Role::FOLLOWER; + } leader_ = std::make_pair(req.get_leader_ip(), req.get_leader_port()); term_ = proposedTerm_ = req.get_current_term(); @@ -1248,6 +1257,18 @@ folly::Future RaftPart::sendHeartbeat() { return appendLogAsync(clusterId_, LogType::NORMAL, std::move(log)); } +std::vector> RaftPart::followers() const { + CHECK(!raftLock_.try_lock()); + decltype(hosts_) hosts; + for (auto& h : hosts_) { + if (!h->isLearner()) { + hosts.emplace_back(h); + } + } + return hosts; +} + + } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 4d1b0b98e65..5f789069693 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -82,6 +82,11 @@ class RaftPart : public std::enable_shared_from_this { return role_ == Role::FOLLOWER; } + bool isLearner() const { + std::lock_guard g(raftLock_); + return role_ == Role::LEARNER; + } + ClusterID clusterId() const { return clusterId_; } @@ -107,9 +112,11 @@ class RaftPart : public std::enable_shared_from_this { return wal_; } + void addLearner(const HostAddr& learner); + // Change the partition status to RUNNING. This is called // by the inherited class, when it's ready to serve - virtual void start(std::vector&& peers); + virtual void start(std::vector&& peers, bool asLearner = false); // Change the partition status to STOPPED. This is called // by the inherited class, when it's about to stop @@ -219,17 +226,19 @@ class RaftPart : public std::enable_shared_from_this { enum class Role { LEADER = 1, // the leader FOLLOWER, // following a leader - CANDIDATE // Has sent AskForVote request + CANDIDATE, // Has sent AskForVote request + LEARNER // It is the same with FOLLOWER, + // except it does not participate in leader election }; // A list of // idx -- the index of the peer // resp -- AskForVoteResponse - using ElectionResponses = std::vector; + using ElectionResponses = std::vector>; // A list of // idx -- the index of the peer // resp -- AppendLogResponse - using AppendLogResponses = std::vector; + using AppendLogResponses = std::vector>; // using LogCache = std::vector< @@ -274,7 +283,7 @@ class RaftPart : public std::enable_shared_from_this { // return FALSE bool prepareElectionRequest( cpp2::AskForVoteRequest& req, - std::shared_ptr>>& hosts); + std::vector>& hosts); // The method returns the partition's role after the election Role processElectionResponses(const ElectionResponses& results); @@ -306,8 +315,10 @@ class RaftPart : public std::enable_shared_from_this { LogID lastLogId, LogID committedId, TermID prevLogTerm, - LogID prevLogId); + LogID prevLogId, + std::vector> hosts); + std::vector> followers() const; protected: template @@ -393,8 +404,7 @@ class RaftPart : public std::enable_shared_from_this { const GraphSpaceID spaceId_; const PartitionID partId_; const HostAddr addr_; - std::shared_ptr>> - peerHosts_; + std::vector> hosts_; size_t quorum_{0}; // The lock is used to protect logs_ and cachingPromise_ diff --git a/src/kvstore/raftex/RaftexService.cpp b/src/kvstore/raftex/RaftexService.cpp index 64182861ac8..1630a4574f1 100644 --- a/src/kvstore/raftex/RaftexService.cpp +++ b/src/kvstore/raftex/RaftexService.cpp @@ -32,10 +32,9 @@ std::shared_ptr RaftexService::createService( } svc->serverThread_.reset(new std::thread([svc] { - LOG(INFO) << "Starting the Raftex Service"; - svc->server_->setup(); svc->serverPort_ = svc->server_->getAddress().getPort(); + LOG(INFO) << "Starting the Raftex Service on " << svc->serverPort_; SCOPE_EXIT { svc->server_->cleanUp(); }; diff --git a/src/kvstore/raftex/test/CMakeLists.txt b/src/kvstore/raftex/test/CMakeLists.txt index 9b7339b061e..939bef1d7c3 100644 --- a/src/kvstore/raftex/test/CMakeLists.txt +++ b/src/kvstore/raftex/test/CMakeLists.txt @@ -92,3 +92,26 @@ nebula_link_libraries( ) nebula_add_test(log_command_test) +add_executable( + learner_test + LearnerTest.cpp + RaftexTestBase.cpp + TestShard.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +nebula_link_libraries( + learner_test + ${THRIFT_LIBRARIES} + wangle + gtest +) +nebula_add_test(learner_test) + diff --git a/src/kvstore/raftex/test/LearnerTest.cpp b/src/kvstore/raftex/test/LearnerTest.cpp new file mode 100644 index 00000000000..bb879761370 --- /dev/null +++ b/src/kvstore/raftex/test/LearnerTest.cpp @@ -0,0 +1,193 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "fs/FileUtils.h" +#include "thread/GenericThreadPool.h" +#include "network/NetworkUtils.h" +#include "kvstore/wal/BufferFlusher.h" +#include "kvstore/raftex/RaftexService.h" +#include "kvstore/raftex/test/RaftexTestBase.h" +#include "kvstore/raftex/test/TestShard.h" + +DECLARE_uint32(heartbeat_interval); + + +namespace nebula { +namespace raftex { + +TEST(LearnerTest, OneLeaderOneFollowerOneLearnerTest) { + fs::TempDir walRoot("/tmp/learner_test.XXXXXX"); + std::shared_ptr workers; + std::vector wals; + std::vector allHosts; + std::vector> services; + std::vector> copies; + + std::shared_ptr leader; + std::vector isLearner = {false, false, true}; + // The last one is learner + setupRaft(3, walRoot, workers, wals, allHosts, services, copies, leader, isLearner); + + checkLeadership(copies, leader); + + auto f = leader->sendCommandAsync(test::encodeLearner(allHosts[2])); + f.wait(); + + std::vector msgs; + LogID id = -1; + appendLogs(1, 100, leader, msgs, id); + + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (auto& c : copies) { + ASSERT_EQ(100, c->getNumLogs()); + } + + for (int i = 0; i < 100; ++i, ++id) { + for (auto& c : copies) { + folly::StringPiece msg; + ASSERT_TRUE(c->getLogMsg(id, msg)) << "id :" << id << ", i:" << i; + ASSERT_EQ(msgs[i], msg.toString()); + } + } + + finishRaft(services, copies, workers, leader); +} + +TEST(LearnerTest, OneLeaderTwoLearnerTest) { + fs::TempDir walRoot("/tmp/learner_test.XXXXXX"); + std::shared_ptr workers; + std::vector wals; + std::vector allHosts; + std::vector> services; + std::vector> copies; + + std::shared_ptr leader; + std::vector isLearner = {false, true, true}; + // Start three services, the first one will be the leader, the left two will be learners. + setupRaft(3, walRoot, workers, wals, allHosts, services, copies, leader, isLearner); + + // The copies[0] is the leader. + checkLeadership(copies, 0, leader); + + leader->sendCommandAsync(test::encodeLearner(allHosts[1])); + auto f = leader->sendCommandAsync(test::encodeLearner(allHosts[2])); + f.wait(); + + std::vector msgs; + LogID id = -1; + appendLogs(1, 100, leader, msgs, id); + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (auto& c : copies) { + ASSERT_EQ(100, c->getNumLogs()); + } + + for (int i = 0; i < 100; ++i, ++id) { + for (auto& c : copies) { + folly::StringPiece msg; + ASSERT_TRUE(c->getLogMsg(id, msg)) << "id :" << id << ", i:" << i; + ASSERT_EQ(msgs[i], msg.toString()); + } + } + + LOG(INFO) << "Let's kill the two learners, the leader should still work"; + for (auto i = 1; i < 3; i++) { + services[i]->removePartition(copies[i]); + } + + checkLeadership(copies, 0, leader); + + appendLogs(101, 200, leader, msgs, id); + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval/2); + + // Check the leader + ASSERT_EQ(200, leader->getNumLogs()); + + for (int i = 101; i < 200; ++i, ++id) { + folly::StringPiece msg; + ASSERT_TRUE(leader->getLogMsg(id, msg)) << "id :" << id << ", i:" << i; + ASSERT_EQ(msgs[i - 1], msg.toString()); + } + finishRaft(services, copies, workers, leader); +} + +TEST(LearnerTest, CatchUpDataTest) { + fs::TempDir walRoot("/tmp/catch_up_data.XXXXXX"); + std::shared_ptr workers; + std::vector wals; + std::vector allHosts; + std::vector> services; + std::vector> copies; + + std::shared_ptr leader; + std::vector isLearner = {false, false, false, true}; + setupRaft(4, walRoot, workers, wals, allHosts, services, copies, leader, isLearner); + + // Check all hosts agree on the same leader + checkLeadership(copies, leader); + + std::vector msgs; + LogID id = -1; + appendLogs(1, 100, leader, msgs, id); + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (int i = 0; i < 3; i++) { + ASSERT_EQ(100, copies[i]->getNumLogs()); + } + + for (int i = 0; i < 100; ++i, ++id) { + for (int j = 0; j < 3; j++) { + folly::StringPiece msg; + ASSERT_TRUE(copies[j]->getLogMsg(id, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + } + + LOG(INFO) << "Add learner, we need to catch up data!"; + auto f = leader->sendCommandAsync(test::encodeLearner(allHosts[3])); + f.wait(); + + sleep(1); + auto& learner = copies[3]; + ASSERT_EQ(100, learner->getNumLogs()); + id = learner->currLogId_ - 99; + for (int i = 0; i < 100; ++i, ++id) { + folly::StringPiece msg; + ASSERT_TRUE(learner->getLogMsg(id, msg)); + ASSERT_EQ(msgs[i], msg.toString()) << "id " << id << ", i " << i; + } + + finishRaft(services, copies, workers, leader); +} +} // namespace raftex +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + // `flusher' is extern-declared in RaftexTestBase.h, defined in RaftexTestBase.cpp + using nebula::raftex::flusher; + flusher = std::make_unique(); + + return RUN_ALL_TESTS(); +} + + diff --git a/src/kvstore/raftex/test/LogAppendTest.cpp b/src/kvstore/raftex/test/LogAppendTest.cpp index f8a99789f90..a5f1c33b726 100644 --- a/src/kvstore/raftex/test/LogAppendTest.cpp +++ b/src/kvstore/raftex/test/LogAppendTest.cpp @@ -36,17 +36,9 @@ TEST(LogAppend, SimpleAppendWithOneCopy) { // Check all hosts agree on the same leader checkLeadership(copies, leader); - // Append 100 logs - LOG(INFO) << "=====> Start appending logs"; std::vector msgs; - for (int i = 1; i <= 100; ++i) { - msgs.emplace_back( - folly::stringPrintf("Test Log Message %03d", i)); - auto fut = leader->appendAsync(0, msgs.back()); - ASSERT_EQ(AppendLogResult::SUCCEEDED, std::move(fut).get()); - } - LOG(INFO) << "<===== Finish appending logs"; - + LogID id = -1; + appendLogs(1, 100, leader, msgs, id); // Sleep a while to make sure the last log has been committed on // followers sleep(FLAGS_heartbeat_interval); @@ -56,7 +48,6 @@ TEST(LogAppend, SimpleAppendWithOneCopy) { ASSERT_EQ(100, c->getNumLogs()); } - LogID id = leader->firstCommittedLogId_; for (int i = 0; i < 100; ++i, ++id) { for (auto& c : copies) { folly::StringPiece msg; @@ -83,17 +74,9 @@ TEST(LogAppend, SimpleAppendWithThreeCopies) { // Check all hosts agree on the same leader checkLeadership(copies, leader); - // Append 100 logs - LOG(INFO) << "=====> Start appending logs"; std::vector msgs; - for (int i = 1; i <= 100; ++i) { - msgs.emplace_back( - folly::stringPrintf("Test Log Message %03d", i)); - auto fut = leader->appendAsync(0, msgs.back()); - ASSERT_EQ(AppendLogResult::SUCCEEDED, std::move(fut).get()); - } - LOG(INFO) << "<===== Finish appending logs"; - + LogID id = -1; + appendLogs(1, 100, leader, msgs, id); // Sleep a while to make sure the last log has been committed on // followers sleep(FLAGS_heartbeat_interval); @@ -103,7 +86,6 @@ TEST(LogAppend, SimpleAppendWithThreeCopies) { ASSERT_EQ(100, c->getNumLogs()); } - LogID id = leader->firstCommittedLogId_; for (int i = 0; i < 100; ++i, ++id) { for (auto& c : copies) { folly::StringPiece msg; @@ -172,7 +154,8 @@ TEST(LogAppend, MultiThreadAppend) { ASSERT_EQ(numThreads * numLogs, c->getNumLogs()); } - LogID id = leader->firstCommittedLogId_; + // The first log should be heart beat + LogID id = 2; for (int i = 0; i < numThreads * numLogs; ++i, ++id) { folly::StringPiece msg; ASSERT_TRUE(leader->getLogMsg(id, msg)); diff --git a/src/kvstore/raftex/test/LogCASTest.cpp b/src/kvstore/raftex/test/LogCASTest.cpp index 58dad056b3d..385ef25a83e 100644 --- a/src/kvstore/raftex/test/LogCASTest.cpp +++ b/src/kvstore/raftex/test/LogCASTest.cpp @@ -42,6 +42,7 @@ TEST_F(LogCASTest, StartWithValidCAS) { fut.wait(); } } + LogID id = leader_->currLogId_ - 9; LOG(INFO) << "<===== Finish appending logs"; // Sleep a while to make sure the last log has been committed on @@ -53,7 +54,6 @@ TEST_F(LogCASTest, StartWithValidCAS) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -77,6 +77,7 @@ TEST_F(LogCASTest, StartWithInvalidCAS) { fut.wait(); } } + LogID id = leader_->currLogId_ - 9; LOG(INFO) << "<===== Finish appending logs"; // Sleep a while to make sure the last log has been committed on @@ -88,7 +89,6 @@ TEST_F(LogCASTest, StartWithInvalidCAS) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -120,6 +120,7 @@ TEST_F(LogCASTest, ValidCASInMiddle) { fut.wait(); } } + LogID id = leader_->currLogId_ - 9; LOG(INFO) << "<===== Finish appending logs"; // Sleep a while to make sure the last log has been committed on @@ -131,7 +132,6 @@ TEST_F(LogCASTest, ValidCASInMiddle) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -162,6 +162,7 @@ TEST_F(LogCASTest, InvalidCASInMiddle) { fut.wait(); } } + LogID id = leader_->currLogId_ - 9; LOG(INFO) << "<===== Finish appending logs"; // Sleep a while to make sure the last log has been committed on @@ -173,7 +174,6 @@ TEST_F(LogCASTest, InvalidCASInMiddle) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -198,6 +198,7 @@ TEST_F(LogCASTest, EndWithValidCAS) { auto fut = leader_->casAsync("TCAS Log Message"); msgs.emplace_back("CAS Log Message"); fut.wait(); + LogID id = leader_->currLogId_ - 9; LOG(INFO) << "<===== Finish appending logs"; // Sleep a while to make sure the last log has been committed on @@ -209,7 +210,6 @@ TEST_F(LogCASTest, EndWithValidCAS) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -232,6 +232,7 @@ TEST_F(LogCASTest, EndWithInvalidCAS) { leader_->casAsync("FCAS Log Message"); auto fut = leader_->casAsync("FCAS Log Message"); fut.wait(); + LogID id = leader_->currLogId_ - 7; LOG(INFO) << "<===== Finish appending logs"; // Sleep a while to make sure the last log has been committed on @@ -243,7 +244,6 @@ TEST_F(LogCASTest, EndWithInvalidCAS) { ASSERT_EQ(8, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 8; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -265,6 +265,7 @@ TEST_F(LogCASTest, AllValidCAS) { fut.wait(); } } + LogID id = leader_->currLogId_ - 9; LOG(INFO) << "<===== Finish appending logs"; // Sleep a while to make sure the last log has been committed on @@ -276,7 +277,6 @@ TEST_F(LogCASTest, AllValidCAS) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; diff --git a/src/kvstore/raftex/test/LogCommandTest.cpp b/src/kvstore/raftex/test/LogCommandTest.cpp index e8e8099c8d4..f1656334f3a 100644 --- a/src/kvstore/raftex/test/LogCommandTest.cpp +++ b/src/kvstore/raftex/test/LogCommandTest.cpp @@ -42,6 +42,7 @@ TEST_F(LogCommandTest, StartWithCommandLog) { fut.wait(); } } + LogID id = leader_->currLogId_ - 9; LOG(INFO) << "<===== Finish appending logs"; ASSERT_EQ(2, leader_->commitTimes_); @@ -54,7 +55,6 @@ TEST_F(LogCommandTest, StartWithCommandLog) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -85,6 +85,7 @@ TEST_F(LogCommandTest, CommandInMiddle) { fut.wait(); } } + LogID id = leader_->currLogId_ - 9; LOG(INFO) << "<===== Finish appending logs"; ASSERT_EQ(3, leader_->commitTimes_); @@ -97,7 +98,6 @@ TEST_F(LogCommandTest, CommandInMiddle) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -119,6 +119,7 @@ TEST_F(LogCommandTest, EndWithCommand) { auto fut = leader_->sendCommandAsync("Command Log Message"); msgs.emplace_back("Command Log Message"); fut.wait(); + LogID id = leader_->currLogId_ - 9; LOG(INFO) << "<===== Finish appending logs"; ASSERT_EQ(2, leader_->commitTimes_); @@ -131,7 +132,6 @@ TEST_F(LogCommandTest, EndWithCommand) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -152,6 +152,7 @@ TEST_F(LogCommandTest, AllCommandLogs) { fut.wait(); } } + LogID id = leader_->currLogId_ - 9; LOG(INFO) << "<===== Finish appending logs"; // Sleep a while to make sure the last log has been committed on @@ -164,7 +165,6 @@ TEST_F(LogCommandTest, AllCommandLogs) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -216,6 +216,7 @@ TEST_F(LogCommandTest, MixedLogs) { leader_->casAsync("FCAS Log Message"); f.wait(); + LogID id = leader_->currLogId_ - 9; LOG(INFO) << "<===== Finish appending logs"; // Sleep a while to make sure the last log has been committed on @@ -228,7 +229,6 @@ TEST_F(LogCommandTest, MixedLogs) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; diff --git a/src/kvstore/raftex/test/RaftexTestBase.cpp b/src/kvstore/raftex/test/RaftexTestBase.cpp index b6d47a25631..8cb8b48e6b2 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.cpp +++ b/src/kvstore/raftex/test/RaftexTestBase.cpp @@ -25,15 +25,21 @@ std::condition_variable leaderCV; std::vector getPeers(const std::vector& all, - const HostAddr& self) { + const HostAddr& self, + std::vector isLearner) { + if (isLearner.empty()) { + isLearner.resize(all.size(), false); + } std::vector peers; + size_t index = 0; for (const auto& host : all) { - if (host != self) { + if (host != self && !isLearner[index]) { VLOG(2) << "Adding host " << NetworkUtils::intToIPv4(host.first) << ":" << host.second; peers.emplace_back(host); } + index++; } return peers; @@ -77,7 +83,11 @@ void onLeadershipLost( void waitUntilLeaderElected( const std::vector>& copies, - std::shared_ptr& leader) { + std::shared_ptr& leader, + std::vector isLearner) { + if (isLearner.empty()) { + isLearner.resize(copies.size(), false); + } while (true) { { std::unique_lock lock(leaderMutex); @@ -90,16 +100,18 @@ void waitUntilLeaderElected( }); // Sleep some time to wait until resp of heartbeat has come back when elected as leader - usleep(10000); + usleep(30000); bool sameLeader = true; + int32_t index = 0; for (auto& c : copies) { - if (c != nullptr && leader != c) { + if (!isLearner[index] && c != nullptr && leader != c) { if (leader->address() != c->leader()) { sameLeader = false; break; } } + index++; } if (sameLeader) { break; @@ -120,7 +132,8 @@ void setupRaft( std::vector& allHosts, std::vector>& services, std::vector>& copies, - std::shared_ptr& leader) { + std::shared_ptr& leader, + std::vector isLearner) { IPv4 ipInt; CHECK(NetworkUtils::ipv4ToInt("127.0.0.1", ipInt)); @@ -144,6 +157,9 @@ void setupRaft( allHosts.emplace_back(ipInt, port); } + if (isLearner.empty()) { + isLearner.resize(allHosts.size(), false); + } // Create one copy of the shard for each service for (size_t i = 0; i < services.size(); i++) { copies.emplace_back(std::make_shared( @@ -168,11 +184,12 @@ void setupRaft( std::placeholders::_2, std::placeholders::_3))); services[i]->addPartition(copies.back()); - copies.back()->start(getPeers(allHosts, allHosts[i])); + copies.back()->start(getPeers(allHosts, allHosts[i], isLearner), + isLearner[i]); } // Wait untill all copies agree on the same leader - waitUntilLeaderElected(copies, leader); + waitUntilLeaderElected(copies, leader, isLearner); } @@ -187,8 +204,10 @@ void finishRaft(std::vector>& services, for (auto& svc : services) { svc->stop(); } + LOG(INFO) << "Stopping workers..."; workers->stop(); workers->wait(); + LOG(INFO) << "Waiting for all service stopped"; for (auto& svc : services) { svc->waitUntilStop(); } @@ -201,12 +220,45 @@ void checkLeadership(std::vector>& copies, ASSERT_FALSE(!leader); for (auto& c : copies) { - if (c != nullptr && leader != c) { + if (c != nullptr && leader != c && !c->isLearner()) { ASSERT_EQ(leader->address(), c->leader()); } } } +/** + * Check copies[index] is the leader. + * */ +void checkLeadership(std::vector>& copies, + size_t index, + std::shared_ptr& leader) { + std::lock_guard lock(leaderMutex); + ASSERT_FALSE(!leader); + ASSERT_EQ(leader->address(), copies[index]->address()); +} + +void appendLogs(int start, + int end, + std::shared_ptr leader, + std::vector& msgs, + LogID& firstLogId) { + // Append 100 logs + LOG(INFO) << "=====> Start appending logs"; + firstLogId = -1; + for (int i = start; i <= end; ++i) { + msgs.emplace_back( + folly::stringPrintf("Test Log Message %03d", i)); + auto fut = leader->appendAsync(0, msgs.back()); + ASSERT_EQ(AppendLogResult::SUCCEEDED, + std::move(fut).get()); + if (firstLogId < 0) { + firstLogId = leader->currLogId_; + } + } + LOG(INFO) << "<===== Finish appending logs"; +} + + } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/test/RaftexTestBase.h b/src/kvstore/raftex/test/RaftexTestBase.h index 10eaa8d246a..2b9bb046314 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.h +++ b/src/kvstore/raftex/test/RaftexTestBase.h @@ -38,7 +38,8 @@ extern std::condition_variable leaderCV; std::vector getPeers(const std::vector& all, - const HostAddr& self); + const HostAddr& self, + std::vector isLearner = {}); void onLeaderElected( std::vector>& copies, @@ -56,7 +57,8 @@ void onLeadershipLost( void waitUntilLeaderElected( const std::vector>& copies, - std::shared_ptr& leader); + std::shared_ptr& leader, + std::vector isLearner = {}); void setupRaft( int32_t numCopies, @@ -66,7 +68,8 @@ void setupRaft( std::vector& allHosts, std::vector>& services, std::vector>& copies, - std::shared_ptr& leader); + std::shared_ptr& leader, + std::vector isLearner = {}); void finishRaft(std::vector>& services, std::vector>& copies, @@ -76,6 +79,15 @@ void finishRaft(std::vector>& services, void checkLeadership(std::vector>& copies, std::shared_ptr& leader); +void checkLeadership(std::vector>& copies, + size_t index, + std::shared_ptr& leader); + +void appendLogs(int start, + int end, + std::shared_ptr leader, + std::vector& msgs, + LogID& firstLogId); class RaftexTestFixture : public ::testing::Test { public: diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index 14f3fdc8d01..1cecc8dd9e8 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -14,6 +14,22 @@ namespace nebula { namespace raftex { namespace test { +std::string encodeLearner(const HostAddr& addr) { + std::string str; + CommandType type = CommandType::ADD_LEARNER; + str.append(reinterpret_cast(&type), 1); + str.append(reinterpret_cast(&addr), sizeof(HostAddr)); + return str; +} + +HostAddr decodeLearner(const folly::StringPiece& log) { + HostAddr learner; + memcpy(&learner.first, log.begin() + 1, sizeof(learner.first)); + memcpy(&learner.second, log.begin() + 1 + sizeof(learner.first), sizeof(learner.second)); + return learner; +} + + TestShard::TestShard(size_t idx, std::shared_ptr svc, PartitionID partId, @@ -75,11 +91,19 @@ bool TestShard::commitLogs(std::unique_ptr iter) { firstId = iter->logId(); } lastId = iter->logId(); - if (!iter->logMsg().empty()) { - if (firstCommittedLogId_ < 0) { - firstCommittedLogId_ = iter->logId(); + auto log = iter->logMsg(); + if (!log.empty()) { + switch (static_cast(log[0])) { + case CommandType::ADD_LEARNER: { + break; + } + default: { + VLOG(1) << idStr_ << "Write " << iter->logId() << ":" << log; + data_.emplace(iter->logId(), log.toString()); + currLogId_ = iter->logId(); + break; + } } - data_.emplace(iter->logId(), iter->logMsg().toString()); commitLogsNum++; } ++(*iter); diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index 80715d4d103..cc38c4e2175 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -17,6 +17,14 @@ class RaftexService; namespace test { +enum class CommandType : int8_t { + ADD_LEARNER = 0x01, +}; + +std::string encodeLearner(const HostAddr& addr); + +HostAddr decodeLearner(const folly::StringPiece& log); + class TestShard : public RaftPart { public: TestShard( @@ -54,7 +62,20 @@ class TestShard : public RaftPart { bool preProcessLog(LogID, TermID, ClusterID, - const std::string&) override { + const std::string& log) override { + if (!log.empty()) { + switch (static_cast(log[0])) { + case CommandType::ADD_LEARNER: { + auto learner = decodeLearner(log); + addLearner(learner); + LOG(INFO) << idStr_ << "Add learner " << learner; + break; + } + default: { + break; + } + } + } return true; } @@ -63,7 +84,7 @@ class TestShard : public RaftPart { public: int32_t commitTimes_ = 0; - int32_t firstCommittedLogId_ = -1; + int32_t currLogId_ = -1; private: const size_t idx_; diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index dd1b6bcfa96..d5066eefcde 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -93,10 +93,12 @@ TEST(NebulaStoreTest, SimpleTest) { sizeof(int32_t)), folly::stringPrintf("val_%d", i)); } - store->asyncMultiPut(1, 1, std::move(data), [](ResultCode code){ + folly::Baton baton; + store->asyncMultiPut(1, 1, std::move(data), [&] (ResultCode code){ EXPECT_EQ(ResultCode::SUCCEEDED, code); + baton.post(); }); - + baton.wait(); int32_t start = 0; int32_t end = 100; std::string s(reinterpret_cast(&start), sizeof(int32_t)); diff --git a/src/meta/test/ActiveHostsManTest.cpp b/src/meta/test/ActiveHostsManTest.cpp index c6980e09f2f..9addfa02143 100644 --- a/src/meta/test/ActiveHostsManTest.cpp +++ b/src/meta/test/ActiveHostsManTest.cpp @@ -5,6 +5,7 @@ */ #include "base/Base.h" #include +#include #include "meta/ActiveHostsMan.h" #include "fs/TempDir.h" #include "meta/test/TestUtils.h" @@ -46,10 +47,13 @@ TEST(ActiveHostsManTest, MergeHostInfo) { data.emplace_back(MetaServiceUtils::hostKey(0, i), MetaServiceUtils::hostValOnline()); } + folly::Baton baton; kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), - [] (kvstore::ResultCode code) { + [&] (kvstore::ResultCode code) { CHECK_EQ(code, kvstore::ResultCode::SUCCEEDED); + baton.post(); }); + baton.wait(); } int onlineNum = 0; diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index 1c0cd9a9ca1..b2e7d2e4b87 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -16,6 +16,7 @@ #include "meta/processors/partsMan/ListHostsProcessor.h" #include "meta/MetaServiceHandler.h" #include +#include #include "meta/processors/usersMan/AuthenticationProcessor.h" #include "interface/gen-cpp2/common_types.h" #include "time/WallClock.h" @@ -117,11 +118,13 @@ class TestUtils { data.emplace_back(MetaServiceUtils::partKey(id, partId), MetaServiceUtils::partVal(hosts)); } - + folly::Baton baton; kv->asyncMultiPut(0, 0, std::move(data), [&] (kvstore::ResultCode code) { ret = (code == kvstore::ResultCode::SUCCEEDED); + baton.post(); }); + baton.wait(); return ret; } @@ -143,11 +146,13 @@ class TestUtils { tags.emplace_back(MetaServiceUtils::schemaTagKey(1, tagId, ver++), MetaServiceUtils::schemaTagVal(tagName, srcsch)); } - + folly::Baton baton; kv->asyncMultiPut(0, 0, std::move(tags), - [] (kvstore::ResultCode code) { + [&] (kvstore::ResultCode code) { ASSERT_EQ(kvstore::ResultCode::SUCCEEDED, code); + baton.post(); }); + baton.wait(); } static void mockEdge(kvstore::KVStore* kv, int32_t edgeNum, SchemaVer version = 0) { @@ -170,9 +175,12 @@ class TestUtils { MetaServiceUtils::schemaEdgeVal(edgeName, srcsch)); } - kv->asyncMultiPut(0, 0, std::move(edges), [] (kvstore::ResultCode code) { + folly::Baton baton; + kv->asyncMultiPut(0, 0, std::move(edges), [&] (kvstore::ResultCode code) { ASSERT_EQ(kvstore::ResultCode::SUCCEEDED, code); + baton.post(); }); + baton.wait(); } static std::unique_ptr mockMetaServer(uint16_t port, diff --git a/src/storage/test/CompactionTest.cpp b/src/storage/test/CompactionTest.cpp index df9273da7ef..7f09daecfb9 100644 --- a/src/storage/test/CompactionTest.cpp +++ b/src/storage/test/CompactionTest.cpp @@ -7,6 +7,7 @@ #include "base/Base.h" #include "base/NebulaKeyUtils.h" #include +#include #include "fs/TempDir.h" #include "storage/test/TestUtils.h" #include "storage/CompactionFilter.h" @@ -61,11 +62,14 @@ void mockData(kvstore::KVStore* kv) { } } } + folly::Baton baton; kv->asyncMultiPut( 0, partId, std::move(data), [&](kvstore::ResultCode code) { EXPECT_EQ(code, kvstore::ResultCode::SUCCEEDED); + baton.post(); }); + baton.wait(); } } diff --git a/src/storage/test/QueryBoundTest.cpp b/src/storage/test/QueryBoundTest.cpp index b33fa6da7df..d1cbc266df2 100644 --- a/src/storage/test/QueryBoundTest.cpp +++ b/src/storage/test/QueryBoundTest.cpp @@ -67,11 +67,14 @@ void mockData(kvstore::KVStore* kv) { } } } + folly::Baton baton; kv->asyncMultiPut( 0, partId, std::move(data), [&](kvstore::ResultCode code) { EXPECT_EQ(code, kvstore::ResultCode::SUCCEEDED); + baton.post(); }); + baton.wait(); } } diff --git a/src/storage/test/QueryEdgePropsTest.cpp b/src/storage/test/QueryEdgePropsTest.cpp index 40050a44fc0..9662681cb54 100644 --- a/src/storage/test/QueryEdgePropsTest.cpp +++ b/src/storage/test/QueryEdgePropsTest.cpp @@ -36,11 +36,14 @@ void mockData(kvstore::KVStore* kv) { data.emplace_back(std::move(key), std::move(val)); } } + folly::Baton baton; kv->asyncMultiPut( 0, partId, std::move(data), [&](kvstore::ResultCode code) { EXPECT_EQ(code, kvstore::ResultCode::SUCCEEDED); + baton.post(); }); + baton.wait(); } } diff --git a/src/storage/test/QueryStatsTest.cpp b/src/storage/test/QueryStatsTest.cpp index 879474ae83b..cb87829fe0e 100644 --- a/src/storage/test/QueryStatsTest.cpp +++ b/src/storage/test/QueryStatsTest.cpp @@ -48,9 +48,12 @@ void mockData(kvstore::KVStore* kv) { data.emplace_back(std::move(key), std::move(val)); } } + folly::Baton baton; kv->asyncMultiPut(0, partId, std::move(data), [&](kvstore::ResultCode code) { EXPECT_EQ(code, kvstore::ResultCode::SUCCEEDED); + baton.post(); }); + baton.wait(); } } diff --git a/src/storage/test/QueryVertexPropsTest.cpp b/src/storage/test/QueryVertexPropsTest.cpp index cf3e546646e..1bf585d9cd9 100644 --- a/src/storage/test/QueryVertexPropsTest.cpp +++ b/src/storage/test/QueryVertexPropsTest.cpp @@ -41,15 +41,17 @@ TEST(QueryVertexPropsTest, SimpleTest) { data.emplace_back(std::move(key), std::move(val)); } } + folly::Baton baton; kv->asyncMultiPut( 0, partId, std::move(data), [&](kvstore::ResultCode code) { EXPECT_EQ(code, kvstore::ResultCode::SUCCEEDED); + baton.post(); }); + baton.wait(); } - LOG(INFO) << "Build VertexPropsRequest..."; cpp2::VertexPropRequest req; req.set_space_id(0); diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 94e0ed97f07..95160589976 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -72,8 +72,25 @@ TEST(StorageClientTest, VerticesInterfacesTest) { ASSERT_TRUE(ret.ok()) << ret.status(); spaceId = ret.value(); LOG(INFO) << "Created space \"default\", its id is " << spaceId; - sleep(2 * FLAGS_load_data_interval_secs + 1); - + sleep(FLAGS_load_data_interval_secs + 1); + auto* nKV = static_cast(sc->kvStore_.get()); + while (true) { + int readyNum = 0; + for (auto partId = 1; partId <= 10; partId++) { + auto retLeader = nKV->partLeader(spaceId, partId); + if (ok(retLeader)) { + auto leader = value(std::move(retLeader)); + if (leader != HostAddr(0, 0)) { + readyNum++; + } + } + } + if (readyNum == 10) { + LOG(INFO) << "All leaders have been elected!"; + break; + } + usleep(100000); + } auto client = std::make_unique(threadPool, mClient.get()); // VerticesInterfacesTest(addVertices and getVertexProps) diff --git a/src/storage/test/TestUtils.h b/src/storage/test/TestUtils.h index e79b3031c85..7e2c699b8d9 100644 --- a/src/storage/test/TestUtils.h +++ b/src/storage/test/TestUtils.h @@ -17,6 +17,7 @@ #include "dataman/ResultSchemaProvider.h" #include "storage/StorageServiceHandler.h" #include +#include #include "meta/SchemaManager.h" #include #include