Skip to content

Commit

Permalink
Enhance the failover logic for balance procedure. (#2232)
Browse files Browse the repository at this point in the history
* Check peers when added part already existed

* Fix bug about catchup

Co-authored-by: heng <[email protected]>
  • Loading branch information
dangleptr and heng authored Aug 5, 2020
1 parent b64f0b6 commit 28ddf2f
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 40 deletions.
1 change: 1 addition & 0 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ struct AddPartReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
3: bool as_learner,
4: list<common.HostAddr> peers,
}

struct RemovePartReq {
Expand Down
54 changes: 36 additions & 18 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,20 @@ void NebulaStore::addSpace(GraphSpaceID spaceId) {
}


void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) {
void NebulaStore::addPart(GraphSpaceID spaceId,
PartitionID partId,
bool asLearner,
const std::vector<HostAddr>& peers) {
folly::RWSpinLock::WriteHolder wh(&lock_);
auto spaceIt = this->spaces_.find(spaceId);
CHECK(spaceIt != this->spaces_.end()) << "Space should exist!";
if (spaceIt->second->parts_.find(partId) != spaceIt->second->parts_.end()) {
LOG(INFO) << "[" << spaceId << "," << partId << "] has existed!";
auto partIt = spaceIt->second->parts_.find(partId);
if (partIt != spaceIt->second->parts_.end()) {
LOG(INFO) << "[Space: " << spaceId << ", Part: " << partId << "] has existed!";
if (!peers.empty()) {
LOG(INFO) << "[Space: " << spaceId << ", Part: " << partId << "] check peers...";
partIt->second->checkAndResetPeers(peers);
}
return;
}

Expand All @@ -277,15 +285,16 @@ void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearn
targetEngine->addPart(partId);
spaceIt->second->parts_.emplace(
partId,
newPart(spaceId, partId, targetEngine.get(), asLearner));
newPart(spaceId, partId, targetEngine.get(), asLearner, peers));
LOG(INFO) << "Space " << spaceId << ", part " << partId
<< " has been added, asLearner " << asLearner;
}

std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
PartitionID partId,
KVEngine* engine,
bool asLearner) {
bool asLearner,
const std::vector<HostAddr>& defaultPeers) {
auto part = std::make_shared<Part>(spaceId,
partId,
raftAddr_,
Expand All @@ -297,20 +306,29 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
bgWorkers_,
workers_,
snapshot_);
auto metaStatus = options_.partMan_->partMeta(spaceId, partId);
if (!metaStatus.ok()) {
LOG(ERROR) << "options_.partMan_->partMeta(spaceId, partId); error: "
<< metaStatus.status().toString()
<< " spaceId: " << spaceId << ", partId: " << partId;
return nullptr;
}

auto partMeta = metaStatus.value();
std::vector<HostAddr> peers;
for (auto& h : partMeta.peers_) {
if (h != storeSvcAddr_) {
peers.emplace_back(getRaftAddr(h));
VLOG(1) << "Add peer " << peers.back();
if (defaultPeers.empty()) {
// pull the information from meta
auto metaStatus = options_.partMan_->partMeta(spaceId, partId);
if (!metaStatus.ok()) {
LOG(ERROR) << "options_.partMan_->partMeta(spaceId, partId); error: "
<< metaStatus.status().toString()
<< " spaceId: " << spaceId << ", partId: " << partId;
return nullptr;
}

auto partMeta = metaStatus.value();
for (auto& h : partMeta.peers_) {
if (h != storeSvcAddr_) {
peers.emplace_back(getRaftAddr(h));
VLOG(1) << "Add peer " << peers.back();
}
}
} else {
for (auto& h : defaultPeers) {
if (h != raftAddr_) {
peers.emplace_back(h);
}
}
}
raftService_->addPartition(part);
Expand Down
8 changes: 6 additions & 2 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ class NebulaStore : public KVStore, public Handler {
* */
void addSpace(GraphSpaceID spaceId) override;

void addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) override;
void addPart(GraphSpaceID spaceId,
PartitionID partId,
bool asLearner,
const std::vector<HostAddr>& peers = {}) override;

void removeSpace(GraphSpaceID spaceId) override;

Expand All @@ -229,7 +232,8 @@ class NebulaStore : public KVStore, public Handler {
std::shared_ptr<Part> newPart(GraphSpaceID spaceId,
PartitionID partId,
KVEngine* engine,
bool asLearner);
bool asLearner,
const std::vector<HostAddr>& defaultPeers);

ErrorOr<ResultCode, KVEngine*> engine(GraphSpaceID spaceId, PartitionID partId);

Expand Down
24 changes: 18 additions & 6 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
if (ts > startTimeMs_) {
commitTransLeader(newLeader);
} else {
LOG(INFO) << idStr_ << "Skip commit stale transfer leader " << newLeader;
LOG(INFO) << idStr_ << "Skip commit stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -287,7 +289,9 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
if (ts > startTimeMs_) {
commitRemovePeer(peer);
} else {
LOG(INFO) << idStr_ << "Skip commit stale remove peer " << peer;
LOG(INFO) << idStr_ << "Skip commit stale remove peer " << peer
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand Down Expand Up @@ -364,7 +368,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess add learner " << learner;
addLearner(learner);
} else {
LOG(INFO) << idStr_ << "Skip stale add learner " << learner;
LOG(INFO) << idStr_ << "Skip stale add learner " << learner
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -375,7 +381,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
} else {
LOG(INFO) << idStr_ << "Skip stale transfer leader " << newLeader;
LOG(INFO) << idStr_ << "Skip stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -386,7 +394,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
} else {
LOG(INFO) << idStr_ << "Skip stale add peer " << peer;
LOG(INFO) << idStr_ << "Skip stale add peer " << peer
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -397,7 +407,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
} else {
LOG(INFO) << idStr_ << "Skip stale remove peer " << peer;
LOG(INFO) << idStr_ << "Skip stale remove peer " << peer
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/PartManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void MetaServerBasedPartManager::onSpaceOptionUpdated(

void MetaServerBasedPartManager::onPartAdded(const PartMeta& partMeta) {
if (handler_ != nullptr) {
handler_->addPart(partMeta.spaceId_, partMeta.partId_, false);
handler_->addPart(partMeta.spaceId_, partMeta.partId_, false, {});
} else {
VLOG(1) << "handler_ is nullptr!";
}
Expand Down
13 changes: 11 additions & 2 deletions src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@ namespace kvstore {
class Handler {
public:
virtual ~Handler() = default;

virtual void addSpace(GraphSpaceID spaceId) = 0;
virtual void addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) = 0;

virtual void addPart(GraphSpaceID spaceId,
PartitionID partId,
bool asLearner,
const std::vector<HostAddr>& peers) = 0;

virtual void updateSpaceOption(GraphSpaceID spaceId,
const std::unordered_map<std::string, std::string>& options,
bool isDbOption) = 0;

virtual void removeSpace(GraphSpaceID spaceId) = 0;

virtual void removePart(GraphSpaceID spaceId, PartitionID partId) = 0;

virtual int32_t allLeader(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderIds) = 0;
};
Expand Down Expand Up @@ -105,7 +114,7 @@ class MemPartManager final : public PartManager {
handler_->addSpace(spaceId);
}
if (noPart && handler_) {
handler_->addPart(spaceId, partId, false);
handler_->addPart(spaceId, partId, false, {});
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,10 @@ typename RaftPart::Role RaftPart::processElectionResponses(
<< ", double my election interval.";
uint64_t curWeight = weight_.load();
weight_.store(curWeight * 2);
} else {
LOG(ERROR) << idStr_ << "Receive response about askForVote from "
<< hosts[r.first]->address()
<< ", error code is " << static_cast<int32_t>(r.second.get_error_code());
}
}

Expand Down Expand Up @@ -1619,7 +1623,7 @@ cpp2::ErrorCode RaftPart::verifyLeader(
return h->address() == candidate;
});
if (it == hosts.end()) {
VLOG(2) << idStr_ << "The candidate leader " << candidate << " is not my peers";
LOG(INFO) << idStr_ << "The candidate leader " << candidate << " is not my peers";
return cpp2::ErrorCode::E_WRONG_LEADER;
}

Expand Down Expand Up @@ -1833,7 +1837,8 @@ AppendLogResult RaftPart::isCatchedUp(const HostAddr& peer) {
}
for (auto& host : hosts_) {
if (host->addr_ == peer) {
if (host->followerCommittedLogId_ < wal_->firstLogId()) {
if (host->followerCommittedLogId_ == 0
|| host->followerCommittedLogId_ < wal_->firstLogId()) {
LOG(INFO) << idStr_ << "The committed log id of peer is "
<< host->followerCommittedLogId_
<< ", which is invalid or less than my first wal log id";
Expand Down
6 changes: 4 additions & 2 deletions src/kvstore/raftex/test/MemberChangeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ TEST(MemberChangeTest, AddRemovePeerTest) {
LOG(INFO) << "Add the same peer again!";
auto f = leader->sendCommandAsync(test::encodeAddPeer(allHosts[3]));
f.wait();

// sleep a while to ensure the learner receive the command.
sleep(1);
for (auto& c : copies) {
CHECK_EQ(3, c->hosts_.size());
}
Expand All @@ -68,7 +69,8 @@ TEST(MemberChangeTest, AddRemovePeerTest) {
LOG(INFO) << "Remove the peer added!";
auto f = leader->sendCommandAsync(test::encodeRemovePeer(allHosts[3]));
f.wait();

// sleep a while to ensure the learner receive the command.
sleep(1);
for (size_t i = 0; i < copies.size() - 1; i++) {
CHECK_EQ(2, copies[i]->hosts_.size());
}
Expand Down
27 changes: 23 additions & 4 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ folly::Future<Status> AdminClient::addPart(GraphSpaceID spaceId,
req.set_space_id(spaceId);
req.set_part_id(partId);
req.set_as_learner(asLearner);
auto ret = getPeers(spaceId, partId);
if (!ret.ok()) {
return ret.status();
}
auto peers = std::move(ret).value();
std::vector<nebula::cpp2::HostAddr> thriftPeers;
thriftPeers.resize(peers.size());
std::transform(peers.begin(), peers.end(), thriftPeers.begin(), [this](const auto& h) {
return toThriftHost(h);
});
req.set_peers(std::move(thriftPeers));
return getResponse(host, std::move(req), [] (auto client, auto request) {
return client->future_addPart(request);
}, [] (auto&& resp) -> Status {
Expand Down Expand Up @@ -267,13 +278,17 @@ folly::Future<Status> AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID
auto fut = pro.getFuture();
std::vector<folly::Future<Status>> futures;
for (auto& p : peers) {
if (!ActiveHostsMan::isLived(kv_, p)) {
LOG(INFO) << "[" << spaceId << ":" << partId << "], Skip the dead host " << p;
continue;
}
auto f = getResponse(p, req, [] (auto client, auto request) {
return client->future_checkPeers(request);
}, [] (auto&& resp) -> Status {
if (resp.get_code() == storage::cpp2::ErrorCode::SUCCEEDED) {
return Status::OK();
} else {
return Status::Error("Add part failed! code=%d",
return Status::Error("Check peers failed! code=%d",
static_cast<int32_t>(resp.get_code()));
}
});
Expand Down Expand Up @@ -316,12 +331,16 @@ folly::Future<Status> AdminClient::getResponse(
this] () mutable {
auto client = clientsMan_->client(host, evb);
remoteFunc(client, std::move(req)).via(evb)
.then([p = std::move(pro), partId, respGen = std::move(respGen)](
.then([p = std::move(pro), partId, respGen = std::move(respGen), host](
folly::Try<storage::cpp2::AdminExecResp>&& t) mutable {
// exception occurred during RPC
auto hostStr = network::NetworkUtils::intToIPv4(host.first);
if (t.hasException()) {
p.setValue(Status::Error(folly::stringPrintf("RPC failure in AdminClient: %s",
t.exception().what().c_str())));
p.setValue(Status::Error(folly::stringPrintf(
"[%s:%d] RPC failure in AdminClient: %s",
hostStr.c_str(),
host.second,
t.exception().what().c_str())));
return;
}
auto&& result = std::move(t).value().get_result();
Expand Down
4 changes: 4 additions & 0 deletions src/meta/processors/admin/BalancePlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ void BalancePlan::dispatchTasks() {

void BalancePlan::invoke() {
status_ = Status::IN_PROGRESS;
// Sort the tasks by its id to ensure the order after recovery.
std::sort(tasks_.begin(), tasks_.end(), [](auto& l, auto& r) {
return l.taskIdStr() < r.taskIdStr();
});
dispatchTasks();
for (size_t i = 0; i < buckets_.size(); i++) {
for (size_t j = 0; j < buckets_[i].size(); j++) {
Expand Down
14 changes: 12 additions & 2 deletions src/meta/processors/admin/BalanceTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,20 @@ void BalanceTask::invoke() {
}
switch (status_) {
case Status::START: {
LOG(INFO) << taskIdStr_ << "Start to move part!";
status_ = Status::CHANGE_LEADER;
LOG(INFO) << taskIdStr_ << "Start to move part, check the peers firstly!";
ret_ = Result::IN_PROGRESS;
startTimeMs_ = time::WallClock::fastNowInMilliSec();
SAVE_STATE();
client_->checkPeers(spaceId_, partId_).thenValue([this] (auto&& resp) {
if (!resp.ok()) {
LOG(INFO) << taskIdStr_ << "Check the peers failed, status " << resp;
ret_ = Result::FAILED;
} else {
status_ = Status::CHANGE_LEADER;
}
invoke();
});
break;
}
// fallthrough
case Status::CHANGE_LEADER: {
Expand Down
7 changes: 6 additions & 1 deletion src/storage/admin/AdminProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,12 @@ class AddPartProcessor : public BaseProcessor<cpp2::AdminExecResp> {
LOG(INFO) << "Space " << spaceId << " not exist, create it!";
store->addSpace(spaceId);
}
store->addPart(spaceId, partId, req.get_as_learner());
std::vector<HostAddr> peers;
for (auto& p : req.get_peers()) {
peers.emplace_back(
kvstore::NebulaStore::getRaftAddr(HostAddr(p.get_ip(), p.get_port())));
}
store->addPart(spaceId, partId, req.get_as_learner(), peers);
onFinished();
}

Expand Down

0 comments on commit 28ddf2f

Please sign in to comment.