Skip to content

Commit

Permalink
Support Learner in Raft
Browse files Browse the repository at this point in the history
  • Loading branch information
heng committed May 29, 2019
1 parent 06a8356 commit 2bbf104
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 109 deletions.
10 changes: 6 additions & 4 deletions src/common/base/CollectNSucceeded.inl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ folly::Future<SucceededResultList<FutureIter>> collectNSucceeded(
size_t n,
ResultEval&& eval) {
using Result = SucceededResultList<FutureIter>;
if (n == 0) {
return folly::Future<Result>(Result());
}

struct Context {
Context(size_t total, ResultEval&& e)
Expand All @@ -30,7 +33,6 @@ folly::Future<SucceededResultList<FutureIter>> collectNSucceeded(
};

size_t total = size_t(std::distance(first, last));
DCHECK_GT(n, 0U);
DCHECK_GE(total, 0U);

if (total < n) {
Expand All @@ -45,11 +47,11 @@ folly::Future<SucceededResultList<FutureIter>> collectNSucceeded(
// for each succeeded Future, add to the result list, until
// we have required number of futures, at which point we fulfil
// the promise with the result list
for (; first != last; ++first) {
first->setCallback_([n, ctx] (
for (size_t index = 0; first != last; ++first, ++index) {
first->setCallback_([n, ctx, index] (
folly::Try<FutureReturnType<FutureIter>>&& t) {
if (!ctx->promise.isFulfilled()) {
if (!t.hasException() && ctx->eval(t.value())) {
if (!t.hasException() && ctx->eval(index, t.value())) {
ctx->results.emplace_back(std::move(t.value()));
}
if ((++ctx->numCompleted) == ctx->nTotal ||
Expand Down
15 changes: 14 additions & 1 deletion src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ namespace raftex {

using nebula::network::NetworkUtils;

Host::Host(const HostAddr& addr, std::shared_ptr<RaftPart> part)
Host::Host(const HostAddr& addr, std::shared_ptr<RaftPart> part, bool isLearner)
: part_(std::move(part))
, addr_(addr)
, isLearner_(isLearner)
, idStr_(folly::stringPrintf(
"[Host: %s:%d] ",
NetworkUtils::intToIPv4(addr_.first).c_str(),
Expand All @@ -39,6 +40,7 @@ void Host::waitForStop() {
noMoreRequestCV_.wait(g, [this] {
return !requestOnGoing_;
});
LOG(INFO) << idStr_ << "The host has been stopped!";
}


Expand All @@ -61,6 +63,17 @@ cpp2::ErrorCode Host::checkStatus(std::lock_guard<std::mutex>& lck) const {

folly::Future<cpp2::AskForVoteResponse> Host::askForVote(
const cpp2::AskForVoteRequest& req) {
{
std::lock_guard<std::mutex> g(lock_);
auto res = checkStatus(g);
if (res != cpp2::ErrorCode::SUCCEEDED) {
VLOG(2) << idStr_
<< "The Host is not in a proper status, do not send";
cpp2::AskForVoteResponse resp;
resp.set_error_code(res);
return resp;
}
}
auto client = tcManager().client(addr_);
return client->future_askForVote(req);
}
Expand Down
12 changes: 10 additions & 2 deletions src/kvstore/raftex/Host.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ class RaftPart;

class Host final : public std::enable_shared_from_this<Host> {
public:
Host(const HostAddr& addr, std::shared_ptr<RaftPart> part);
Host(const HostAddr& addr, std::shared_ptr<RaftPart> part, bool isLearner = false);

~Host() {
LOG(INFO) << idStr_ << " The host has been destroyed!";
}

const char* idStr() const {
return idStr_.c_str();
Expand All @@ -50,6 +54,10 @@ class Host final : public std::enable_shared_from_this<Host> {

void waitForStop();

bool isLearner() const {
return isLearner_;
}

folly::Future<cpp2::AskForVoteResponse> askForVote(
const cpp2::AskForVoteRequest& req);

Expand Down Expand Up @@ -84,9 +92,9 @@ class Host final : public std::enable_shared_from_this<Host> {
private:
std::shared_ptr<RaftPart> part_;
const HostAddr addr_;
bool isLearner_ = false;
const std::string idStr_;


mutable std::mutex lock_;

bool paused_{false};
Expand Down
Loading

0 comments on commit 2bbf104

Please sign in to comment.