Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance the failover logic for balance procedure. #2232

Merged
merged 3 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ struct AddPartReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
3: bool as_learner,
4: list<common.HostAddr> peers,
}

struct RemovePartReq {
Expand Down
54 changes: 36 additions & 18 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,20 @@ void NebulaStore::addSpace(GraphSpaceID spaceId) {
}


void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) {
void NebulaStore::addPart(GraphSpaceID spaceId,
PartitionID partId,
bool asLearner,
const std::vector<HostAddr>& peers) {
folly::RWSpinLock::WriteHolder wh(&lock_);
auto spaceIt = this->spaces_.find(spaceId);
CHECK(spaceIt != this->spaces_.end()) << "Space should exist!";
if (spaceIt->second->parts_.find(partId) != spaceIt->second->parts_.end()) {
LOG(INFO) << "[" << spaceId << "," << partId << "] has existed!";
auto partIt = spaceIt->second->parts_.find(partId);
if (partIt != spaceIt->second->parts_.end()) {
LOG(INFO) << "[Space: " << spaceId << ", Part: " << partId << "] has existed!";
if (!peers.empty()) {
LOG(INFO) << "[Space: " << spaceId << ", Part: " << partId << "] check peers...";
partIt->second->checkAndResetPeers(peers);
}
return;
}

Expand All @@ -277,15 +285,16 @@ void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearn
targetEngine->addPart(partId);
spaceIt->second->parts_.emplace(
partId,
newPart(spaceId, partId, targetEngine.get(), asLearner));
newPart(spaceId, partId, targetEngine.get(), asLearner, peers));
LOG(INFO) << "Space " << spaceId << ", part " << partId
<< " has been added, asLearner " << asLearner;
}

std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
PartitionID partId,
KVEngine* engine,
bool asLearner) {
bool asLearner,
const std::vector<HostAddr>& defaultPeers) {
auto part = std::make_shared<Part>(spaceId,
partId,
raftAddr_,
Expand All @@ -297,20 +306,29 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
bgWorkers_,
workers_,
snapshot_);
auto metaStatus = options_.partMan_->partMeta(spaceId, partId);
if (!metaStatus.ok()) {
LOG(ERROR) << "options_.partMan_->partMeta(spaceId, partId); error: "
<< metaStatus.status().toString()
<< " spaceId: " << spaceId << ", partId: " << partId;
return nullptr;
}

auto partMeta = metaStatus.value();
std::vector<HostAddr> peers;
for (auto& h : partMeta.peers_) {
if (h != storeSvcAddr_) {
peers.emplace_back(getRaftAddr(h));
VLOG(1) << "Add peer " << peers.back();
if (defaultPeers.empty()) {
// pull the information from meta
auto metaStatus = options_.partMan_->partMeta(spaceId, partId);
if (!metaStatus.ok()) {
LOG(ERROR) << "options_.partMan_->partMeta(spaceId, partId); error: "
<< metaStatus.status().toString()
<< " spaceId: " << spaceId << ", partId: " << partId;
return nullptr;
}

auto partMeta = metaStatus.value();
for (auto& h : partMeta.peers_) {
if (h != storeSvcAddr_) {
peers.emplace_back(getRaftAddr(h));
VLOG(1) << "Add peer " << peers.back();
}
}
} else {
for (auto& h : defaultPeers) {
if (h != raftAddr_) {
peers.emplace_back(h);
}
}
}
raftService_->addPartition(part);
Expand Down
8 changes: 6 additions & 2 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ class NebulaStore : public KVStore, public Handler {
* */
void addSpace(GraphSpaceID spaceId) override;

void addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) override;
void addPart(GraphSpaceID spaceId,
PartitionID partId,
bool asLearner,
const std::vector<HostAddr>& peers = {}) override;

void removeSpace(GraphSpaceID spaceId) override;

Expand All @@ -229,7 +232,8 @@ class NebulaStore : public KVStore, public Handler {
std::shared_ptr<Part> newPart(GraphSpaceID spaceId,
PartitionID partId,
KVEngine* engine,
bool asLearner);
bool asLearner,
const std::vector<HostAddr>& defaultPeers);

ErrorOr<ResultCode, KVEngine*> engine(GraphSpaceID spaceId, PartitionID partId);

Expand Down
24 changes: 18 additions & 6 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
if (ts > startTimeMs_) {
commitTransLeader(newLeader);
} else {
LOG(INFO) << idStr_ << "Skip commit stale transfer leader " << newLeader;
LOG(INFO) << idStr_ << "Skip commit stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -287,7 +289,9 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
if (ts > startTimeMs_) {
commitRemovePeer(peer);
} else {
LOG(INFO) << idStr_ << "Skip commit stale remove peer " << peer;
LOG(INFO) << idStr_ << "Skip commit stale remove peer " << peer
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand Down Expand Up @@ -364,7 +368,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess add learner " << learner;
addLearner(learner);
} else {
LOG(INFO) << idStr_ << "Skip stale add learner " << learner;
LOG(INFO) << idStr_ << "Skip stale add learner " << learner
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -375,7 +381,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
} else {
LOG(INFO) << idStr_ << "Skip stale transfer leader " << newLeader;
LOG(INFO) << idStr_ << "Skip stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -386,7 +394,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
} else {
LOG(INFO) << idStr_ << "Skip stale add peer " << peer;
LOG(INFO) << idStr_ << "Skip stale add peer " << peer
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -397,7 +407,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
} else {
LOG(INFO) << idStr_ << "Skip stale remove peer " << peer;
LOG(INFO) << idStr_ << "Skip stale remove peer " << peer
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/PartManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void MetaServerBasedPartManager::onSpaceOptionUpdated(

void MetaServerBasedPartManager::onPartAdded(const PartMeta& partMeta) {
if (handler_ != nullptr) {
handler_->addPart(partMeta.spaceId_, partMeta.partId_, false);
handler_->addPart(partMeta.spaceId_, partMeta.partId_, false, {});
} else {
VLOG(1) << "handler_ is nullptr!";
}
Expand Down
13 changes: 11 additions & 2 deletions src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@ namespace kvstore {
class Handler {
public:
virtual ~Handler() = default;

virtual void addSpace(GraphSpaceID spaceId) = 0;
virtual void addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) = 0;

virtual void addPart(GraphSpaceID spaceId,
PartitionID partId,
bool asLearner,
const std::vector<HostAddr>& peers) = 0;

virtual void updateSpaceOption(GraphSpaceID spaceId,
const std::unordered_map<std::string, std::string>& options,
bool isDbOption) = 0;

virtual void removeSpace(GraphSpaceID spaceId) = 0;

virtual void removePart(GraphSpaceID spaceId, PartitionID partId) = 0;

virtual int32_t allLeader(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderIds) = 0;
};
Expand Down Expand Up @@ -105,7 +114,7 @@ class MemPartManager final : public PartManager {
handler_->addSpace(spaceId);
}
if (noPart && handler_) {
handler_->addPart(spaceId, partId, false);
handler_->addPart(spaceId, partId, false, {});
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,10 @@ typename RaftPart::Role RaftPart::processElectionResponses(
<< ", double my election interval.";
uint64_t curWeight = weight_.load();
weight_.store(curWeight * 2);
} else {
LOG(ERROR) << idStr_ << "Receive response about askForVote from "
<< hosts[r.first]->address()
<< ", error code is " << static_cast<int32_t>(r.second.get_error_code());
}
}

Expand Down Expand Up @@ -1619,7 +1623,7 @@ cpp2::ErrorCode RaftPart::verifyLeader(
return h->address() == candidate;
});
if (it == hosts.end()) {
VLOG(2) << idStr_ << "The candidate leader " << candidate << " is not my peers";
LOG(INFO) << idStr_ << "The candidate leader " << candidate << " is not my peers";
return cpp2::ErrorCode::E_WRONG_LEADER;
}

Expand Down Expand Up @@ -1833,7 +1837,8 @@ AppendLogResult RaftPart::isCatchedUp(const HostAddr& peer) {
}
for (auto& host : hosts_) {
if (host->addr_ == peer) {
if (host->followerCommittedLogId_ < wal_->firstLogId()) {
if (host->followerCommittedLogId_ == 0
|| host->followerCommittedLogId_ < wal_->firstLogId()) {
LOG(INFO) << idStr_ << "The committed log id of peer is "
<< host->followerCommittedLogId_
<< ", which is invalid or less than my first wal log id";
Expand Down
6 changes: 4 additions & 2 deletions src/kvstore/raftex/test/MemberChangeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ TEST(MemberChangeTest, AddRemovePeerTest) {
LOG(INFO) << "Add the same peer again!";
auto f = leader->sendCommandAsync(test::encodeAddPeer(allHosts[3]));
f.wait();

// sleep a while to ensure the learner receive the command.
sleep(1);
for (auto& c : copies) {
CHECK_EQ(3, c->hosts_.size());
}
Expand All @@ -68,7 +69,8 @@ TEST(MemberChangeTest, AddRemovePeerTest) {
LOG(INFO) << "Remove the peer added!";
auto f = leader->sendCommandAsync(test::encodeRemovePeer(allHosts[3]));
f.wait();

// sleep a while to ensure the learner receive the command.
sleep(1);
for (size_t i = 0; i < copies.size() - 1; i++) {
CHECK_EQ(2, copies[i]->hosts_.size());
}
Expand Down
27 changes: 23 additions & 4 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ folly::Future<Status> AdminClient::addPart(GraphSpaceID spaceId,
req.set_space_id(spaceId);
req.set_part_id(partId);
req.set_as_learner(asLearner);
auto ret = getPeers(spaceId, partId);
if (!ret.ok()) {
return ret.status();
}
auto peers = std::move(ret).value();
std::vector<nebula::cpp2::HostAddr> thriftPeers;
thriftPeers.resize(peers.size());
std::transform(peers.begin(), peers.end(), thriftPeers.begin(), [this](const auto& h) {
return toThriftHost(h);
});
req.set_peers(std::move(thriftPeers));
return getResponse(host, std::move(req), [] (auto client, auto request) {
return client->future_addPart(request);
}, [] (auto&& resp) -> Status {
Expand Down Expand Up @@ -267,13 +278,17 @@ folly::Future<Status> AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID
auto fut = pro.getFuture();
std::vector<folly::Future<Status>> futures;
for (auto& p : peers) {
if (!ActiveHostsMan::isLived(kv_, p)) {
LOG(INFO) << "[" << spaceId << ":" << partId << "], Skip the dead host " << p;
continue;
}
auto f = getResponse(p, req, [] (auto client, auto request) {
return client->future_checkPeers(request);
}, [] (auto&& resp) -> Status {
if (resp.get_code() == storage::cpp2::ErrorCode::SUCCEEDED) {
return Status::OK();
} else {
return Status::Error("Add part failed! code=%d",
return Status::Error("Check peers failed! code=%d",
static_cast<int32_t>(resp.get_code()));
}
});
Expand Down Expand Up @@ -316,12 +331,16 @@ folly::Future<Status> AdminClient::getResponse(
this] () mutable {
auto client = clientsMan_->client(host, evb);
remoteFunc(client, std::move(req)).via(evb)
.then([p = std::move(pro), partId, respGen = std::move(respGen)](
.then([p = std::move(pro), partId, respGen = std::move(respGen), host](
folly::Try<storage::cpp2::AdminExecResp>&& t) mutable {
// exception occurred during RPC
auto hostStr = network::NetworkUtils::intToIPv4(host.first);
if (t.hasException()) {
p.setValue(Status::Error(folly::stringPrintf("RPC failure in AdminClient: %s",
t.exception().what().c_str())));
p.setValue(Status::Error(folly::stringPrintf(
"[%s:%d] RPC failure in AdminClient: %s",
hostStr.c_str(),
host.second,
t.exception().what().c_str())));
return;
}
auto&& result = std::move(t).value().get_result();
Expand Down
4 changes: 4 additions & 0 deletions src/meta/processors/admin/BalancePlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ void BalancePlan::dispatchTasks() {

void BalancePlan::invoke() {
status_ = Status::IN_PROGRESS;
// Sort the tasks by its id to ensure the order after recovery.
std::sort(tasks_.begin(), tasks_.end(), [](auto& l, auto& r) {
return l.taskIdStr() < r.taskIdStr();
});
dispatchTasks();
for (size_t i = 0; i < buckets_.size(); i++) {
for (size_t j = 0; j < buckets_[i].size(); j++) {
Expand Down
14 changes: 12 additions & 2 deletions src/meta/processors/admin/BalanceTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,20 @@ void BalanceTask::invoke() {
}
switch (status_) {
case Status::START: {
LOG(INFO) << taskIdStr_ << "Start to move part!";
status_ = Status::CHANGE_LEADER;
LOG(INFO) << taskIdStr_ << "Start to move part, check the peers firstly!";
ret_ = Result::IN_PROGRESS;
startTimeMs_ = time::WallClock::fastNowInMilliSec();
SAVE_STATE();
client_->checkPeers(spaceId_, partId_).thenValue([this] (auto&& resp) {
if (!resp.ok()) {
LOG(INFO) << taskIdStr_ << "Check the peers failed, status " << resp;
ret_ = Result::FAILED;
} else {
status_ = Status::CHANGE_LEADER;
}
invoke();
});
break;
}
// fallthrough
case Status::CHANGE_LEADER: {
Expand Down
7 changes: 6 additions & 1 deletion src/storage/admin/AdminProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,12 @@ class AddPartProcessor : public BaseProcessor<cpp2::AdminExecResp> {
LOG(INFO) << "Space " << spaceId << " not exist, create it!";
store->addSpace(spaceId);
}
store->addPart(spaceId, partId, req.get_as_learner());
std::vector<HostAddr> peers;
for (auto& p : req.get_peers()) {
peers.emplace_back(
kvstore::NebulaStore::getRaftAddr(HostAddr(p.get_ip(), p.get_port())));
}
store->addPart(spaceId, partId, req.get_as_learner(), peers);
onFinished();
}

Expand Down