Skip to content

Commit

Permalink
Support Learner in Raft
Browse files Browse the repository at this point in the history
  • Loading branch information
heng committed Jul 18, 2019
1 parent 93ed0e3 commit b3f8684
Show file tree
Hide file tree
Showing 31 changed files with 645 additions and 157 deletions.
10 changes: 6 additions & 4 deletions src/common/base/CollectNSucceeded.inl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ folly::Future<SucceededResultList<FutureIter>> collectNSucceeded(
size_t n,
ResultEval&& eval) {
using Result = SucceededResultList<FutureIter>;
if (n == 0) {
return folly::Future<Result>(Result());
}

struct Context {
Context(size_t total, ResultEval&& e)
Expand All @@ -30,7 +33,6 @@ folly::Future<SucceededResultList<FutureIter>> collectNSucceeded(
};

size_t total = size_t(std::distance(first, last));
DCHECK_GT(n, 0U);
DCHECK_GE(total, 0U);

if (total < n) {
Expand All @@ -45,11 +47,11 @@ folly::Future<SucceededResultList<FutureIter>> collectNSucceeded(
// for each succeeded Future, add to the result list, until
// we have required number of futures, at which point we fulfil
// the promise with the result list
for (; first != last; ++first) {
first->setCallback_([n, ctx] (
for (size_t index = 0; first != last; ++first, ++index) {
first->setCallback_([n, ctx, index] (
folly::Try<FutureReturnType<FutureIter>>&& t) {
if (!ctx->promise.isFulfilled()) {
if (!t.hasException() && ctx->eval(t.value())) {
if (!t.hasException() && ctx->eval(index, t.value())) {
ctx->results.emplace_back(std::move(t.value()));
}
if ((++ctx->numCompleted) == ctx->nTotal ||
Expand Down
5 changes: 4 additions & 1 deletion src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct StoreCapability {
};
#define SUPPORT_FILTERING(store) (store.capability() & StoreCapability::SC_FILTERING)


class Part;
/**
* Interface for all kv-stores
**/
Expand Down Expand Up @@ -136,6 +136,9 @@ class KVStore {
const std::string& prefix,
KVCallback cb) = 0;

virtual ErrorOr<ResultCode, std::shared_ptr<Part>> part(GraphSpaceID spaceId,
PartitionID partId) = 0;

protected:
KVStore() = default;
};
Expand Down
23 changes: 23 additions & 0 deletions src/kvstore/LogEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,29 @@ std::vector<folly::StringPiece> decodeMultiValues(folly::StringPiece encoded) {
return values;
}

std::string encodeLearner(const HostAddr& learner) {
std::string encoded;
encoded.reserve(kHeadLen + sizeof(HostAddr));
// Timestamp (8 bytes)
int64_t ts = time::WallClock::fastNowInMilliSec();
encoded.append(reinterpret_cast<char*>(&ts), sizeof(int64_t));
// Log type
auto type = LogType::OP_ADD_LEARNER;
encoded.append(reinterpret_cast<char*>(&type), 1);
encoded.append(reinterpret_cast<const char*>(&learner), sizeof(HostAddr));
return encoded;
}

HostAddr decodeLearner(folly::StringPiece encoded) {
HostAddr addr;
CHECK_EQ(kHeadLen + sizeof(HostAddr), encoded.size());
memcpy(&addr.first, encoded.begin() + kHeadLen, sizeof(addr.first));
memcpy(&addr.second,
encoded.begin() + kHeadLen + sizeof(addr.first),
sizeof(addr.second));
return addr;
}

} // namespace kvstore
} // namespace nebula

4 changes: 4 additions & 0 deletions src/kvstore/LogEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum LogType : char {
OP_MULTI_REMOVE = 0x4,
OP_REMOVE_PREFIX = 0x5,
OP_REMOVE_RANGE = 0x6,
OP_ADD_LEARNER = 0x07,
};


Expand All @@ -32,6 +33,9 @@ std::string encodeMultiValues(LogType type,
folly::StringPiece v2);
std::vector<folly::StringPiece> decodeMultiValues(folly::StringPiece encoded);

std::string encodeLearner(const HostAddr& learner);
HostAddr decodeLearner(folly::StringPiece encoded);

} // namespace kvstore
} // namespace nebula
#endif // KVSTORE_LOGENCODER_H_
63 changes: 49 additions & 14 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,29 +339,41 @@ void NebulaStore::asyncMultiPut(GraphSpaceID spaceId,
PartitionID partId,
std::vector<KV> keyValues,
KVCallback cb) {
folly::RWSpinLock::ReadHolder rh(&lock_);
CHECK_FOR_WRITE(spaceId, partId, cb);
return partIt->second->asyncMultiPut(std::move(keyValues), std::move(cb));
auto ret = part(spaceId, partId);
if (!ok(ret)) {
cb(error(ret));
return;
}
auto part = nebula::value(ret);
return part->asyncMultiPut(std::move(keyValues), std::move(cb));
}


void NebulaStore::asyncRemove(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
KVCallback cb) {
folly::RWSpinLock::ReadHolder rh(&lock_);
CHECK_FOR_WRITE(spaceId, partId, cb);
return partIt->second->asyncRemove(key, std::move(cb));
auto ret = part(spaceId, partId);
if (!ok(ret)) {
cb(error(ret));
return;
}
auto part = nebula::value(ret);
return part->asyncRemove(key, std::move(cb));
}


void NebulaStore::asyncMultiRemove(GraphSpaceID spaceId,
PartitionID partId,
std::vector<std::string> keys,
KVCallback cb) {
folly::RWSpinLock::ReadHolder rh(&lock_);
CHECK_FOR_WRITE(spaceId, partId, cb);
return partIt->second->asyncMultiRemove(std::move(keys), std::move(cb));
auto ret = part(spaceId, partId);
if (!ok(ret)) {
cb(error(ret));
return;
}
auto part = nebula::value(ret);
return part->asyncMultiRemove(std::move(keys), std::move(cb));
}


Expand All @@ -370,19 +382,42 @@ void NebulaStore::asyncRemoveRange(GraphSpaceID spaceId,
const std::string& start,
const std::string& end,
KVCallback cb) {
folly::RWSpinLock::ReadHolder rh(&lock_);
CHECK_FOR_WRITE(spaceId, partId, cb);
return partIt->second->asyncRemoveRange(start, end, std::move(cb));
auto ret = part(spaceId, partId);
if (!ok(ret)) {
cb(error(ret));
return;
}
auto part = nebula::value(ret);
return part->asyncRemoveRange(start, end, std::move(cb));
}


void NebulaStore::asyncRemovePrefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
KVCallback cb) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
cb(error(ret));
return;
}
auto part = nebula::value(ret);
return part->asyncRemovePrefix(prefix, std::move(cb));
}

ErrorOr<ResultCode, std::shared_ptr<Part>> NebulaStore::part(GraphSpaceID spaceId,
PartitionID partId) {
folly::RWSpinLock::ReadHolder rh(&lock_);
CHECK_FOR_WRITE(spaceId, partId, cb);
return partIt->second->asyncRemovePrefix(prefix, std::move(cb));
auto it = spaces_.find(spaceId);
if (UNLIKELY(it == spaces_.end())) {
return ResultCode::ERR_SPACE_NOT_FOUND;
}
auto& parts = it->second->parts_;
auto partIt = parts.find(partId);
if (UNLIKELY(partIt == parts.end())) {
return ResultCode::ERR_PART_NOT_FOUND;
}
return partIt->second;
}


Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class NebulaStore : public KVStore, public Handler {
const std::string& prefix,
KVCallback cb) override;

ErrorOr<ResultCode, std::shared_ptr<Part>> part(GraphSpaceID spaceId,
PartitionID partId) override;

ResultCode ingest(GraphSpaceID spaceId,
const std::string& extra,
const std::vector<std::string>& files);
Expand Down
23 changes: 23 additions & 0 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ void Part::asyncRemoveRange(folly::StringPiece start,
});
}

void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) {
std::string log = encodeLearner(learner);
sendCommandAsync(std::move(log))
.then([callback = std::move(cb)] (AppendLogResult res) mutable {
callback(toResultCode(res));
});
}

void Part::onLostLeadership(TermID term) {
VLOG(1) << "Lost the leadership for the term " << term;
Expand Down Expand Up @@ -219,6 +226,9 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
}
break;
}
case OP_ADD_LEARNER: {
break;
}
default: {
LOG(FATAL) << "Unknown operation: " << static_cast<uint8_t>(log[0]);
}
Expand All @@ -243,6 +253,19 @@ bool Part::preProcessLog(LogID logId,
<< ", termId " << termId
<< ", clusterId " << clusterId
<< ", log " << log;
if (!log.empty()) {
switch (log[sizeof(int64_t)]) {
case OP_ADD_LEARNER: {
auto learner = decodeLearner(log);
addLearner(learner);
LOG(INFO) << idStr_ << "Add learner " << learner;
break;
}
default: {
break;
}
}
}
return true;
}

Expand Down
1 change: 1 addition & 0 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Part : public raftex::RaftPart {
folly::StringPiece end,
KVCallback cb);

void asyncAddLearner(const HostAddr& learner, KVCallback cb);
/**
* Methods inherited from RaftPart
*/
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/plugins/hbase/HBaseStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ class HBaseStore : public KVStore {
const std::string& prefix,
KVCallback cb) override;

ErrorOr<ResultCode, std::shared_ptr<Part>> part(GraphSpaceID,
PartitionID) override {
LOG(FATAL) << "Unsupported!";
}

private:
std::string getRowKey(const std::string& key) {
return key.substr(sizeof(PartitionID), key.size() - sizeof(PartitionID));
Expand Down
24 changes: 23 additions & 1 deletion src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ namespace raftex {

using nebula::network::NetworkUtils;

Host::Host(const HostAddr& addr, std::shared_ptr<RaftPart> part)
Host::Host(const HostAddr& addr, std::shared_ptr<RaftPart> part, bool isLearner)
: part_(std::move(part))
, addr_(addr)
, isLearner_(isLearner)
, idStr_(folly::stringPrintf(
"[Host: %s:%d] ",
NetworkUtils::intToIPv4(addr_.first).c_str(),
Expand All @@ -39,6 +40,7 @@ void Host::waitForStop() {
noMoreRequestCV_.wait(g, [this] {
return !requestOnGoing_;
});
LOG(INFO) << idStr_ << "The host has been stopped!";
}


Expand All @@ -61,6 +63,17 @@ cpp2::ErrorCode Host::checkStatus(std::lock_guard<std::mutex>& lck) const {

folly::Future<cpp2::AskForVoteResponse> Host::askForVote(
const cpp2::AskForVoteRequest& req) {
{
std::lock_guard<std::mutex> g(lock_);
auto res = checkStatus(g);
if (res != cpp2::ErrorCode::SUCCEEDED) {
VLOG(2) << idStr_
<< "The Host is not in a proper status, do not send";
cpp2::AskForVoteResponse resp;
resp.set_error_code(res);
return resp;
}
}
auto client = tcManager().client(addr_);
return client->future_askForVote(req);
}
Expand Down Expand Up @@ -193,6 +206,12 @@ folly::Future<cpp2::AppendLogResponse> Host::appendLogsInternal(
}

cpp2::AppendLogResponse resp = std::move(t).value();
VLOG(3) << self->idStr_ << "AppendLogResponse "
<< "code " << static_cast<int32_t>(resp.get_error_code())
<< ", currTerm " << resp.get_current_term()
<< ", lastLogId " << resp.get_last_log_id()
<< ", lastLogTerm " << resp.get_last_log_term()
<< ", commitLogId " << resp.get_committed_log_id();
switch (resp.get_error_code()) {
case cpp2::ErrorCode::SUCCEEDED: {
VLOG(2) << self->idStr_
Expand Down Expand Up @@ -378,6 +397,9 @@ folly::Future<cpp2::AppendLogResponse> Host::sendAppendLogRequest(
}
}

VLOG(3) << idStr_ << "sendAppendLogRequest, lastLogId " << req->get_last_log_id()
<< ", lastCommittedLogId " << req->get_committed_log_id()
<< ", lastLogIdSend " << req->get_last_log_id_sent();
// Get client connection
auto client = tcManager().client(addr_);
return client->future_appendLog(*req);
Expand Down
15 changes: 13 additions & 2 deletions src/kvstore/raftex/Host.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ class RaftPart;

class Host final : public std::enable_shared_from_this<Host> {
public:
Host(const HostAddr& addr, std::shared_ptr<RaftPart> part);
Host(const HostAddr& addr, std::shared_ptr<RaftPart> part, bool isLearner = false);

~Host() {
LOG(INFO) << idStr_ << " The host has been destroyed!";
}

const char* idStr() const {
return idStr_.c_str();
Expand All @@ -50,6 +54,10 @@ class Host final : public std::enable_shared_from_this<Host> {

void waitForStop();

bool isLearner() const {
return isLearner_;
}

folly::Future<cpp2::AskForVoteResponse> askForVote(
const cpp2::AskForVoteRequest& req);

Expand All @@ -62,6 +70,9 @@ class Host final : public std::enable_shared_from_this<Host> {
TermID lastLogTermSent, // The last log term being sent
LogID lastLogIdSent); // The last log id being sent

const HostAddr& address() const {
return addr_;
}

private:
cpp2::ErrorCode checkStatus(std::lock_guard<std::mutex>& lck) const;
Expand All @@ -84,9 +95,9 @@ class Host final : public std::enable_shared_from_this<Host> {
private:
std::shared_ptr<RaftPart> part_;
const HostAddr addr_;
bool isLearner_ = false;
const std::string idStr_;


mutable std::mutex lock_;

bool paused_{false};
Expand Down
Loading

0 comments on commit b3f8684

Please sign in to comment.