Skip to content

Commit

Permalink
Add check peers at start stage
Browse files Browse the repository at this point in the history
  • Loading branch information
heng committed Jul 24, 2020
1 parent e035bdb commit eaeda4d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 12 deletions.
24 changes: 18 additions & 6 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
if (ts > startTimeMs_) {
commitTransLeader(newLeader);
} else {
LOG(INFO) << idStr_ << "Skip commit stale transfer leader " << newLeader;
LOG(INFO) << idStr_ << "Skip commit stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -287,7 +289,9 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
if (ts > startTimeMs_) {
commitRemovePeer(peer);
} else {
LOG(INFO) << idStr_ << "Skip commit stale remove peer " << peer;
LOG(INFO) << idStr_ << "Skip commit stale remove peer " << peer
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand Down Expand Up @@ -364,7 +368,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess add learner " << learner;
addLearner(learner);
} else {
LOG(INFO) << idStr_ << "Skip stale add learner " << learner;
LOG(INFO) << idStr_ << "Skip stale add learner " << learner
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -375,7 +381,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
} else {
LOG(INFO) << idStr_ << "Skip stale transfer leader " << newLeader;
LOG(INFO) << idStr_ << "Skip stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -386,7 +394,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
} else {
LOG(INFO) << idStr_ << "Skip stale add peer " << peer;
LOG(INFO) << idStr_ << "Skip stale add peer " << peer
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand All @@ -397,7 +407,9 @@ bool Part::preProcessLog(LogID logId,
LOG(INFO) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
} else {
LOG(INFO) << idStr_ << "Skip stale remove peer " << peer;
LOG(INFO) << idStr_ << "Skip stale remove peer " << peer
<< ", the part is opened at " << startTimeMs_
<< ", but the log timestamp is " << ts;
}
break;
}
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,10 @@ typename RaftPart::Role RaftPart::processElectionResponses(
<< ", double my election interval.";
uint64_t curWeight = weight_.load();
weight_.store(curWeight * 2);
} else {
LOG(ERROR) << idStr_ << "Receive response about askForVote from "
<< hosts[r.first]->address()
<< ", error code is " << static_cast<int32_t>(r.second.get_error_code());
}
}

Expand Down
16 changes: 12 additions & 4 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,17 @@ folly::Future<Status> AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID
auto fut = pro.getFuture();
std::vector<folly::Future<Status>> futures;
for (auto& p : peers) {
if (!ActiveHostsMan::isLived(kv_, p)) {
LOG(INFO) << "[" << spaceId << ":" << partId << "], Skip the dead host " << p;
continue;
}
auto f = getResponse(p, req, [] (auto client, auto request) {
return client->future_checkPeers(request);
}, [] (auto&& resp) -> Status {
if (resp.get_code() == storage::cpp2::ErrorCode::SUCCEEDED) {
return Status::OK();
} else {
return Status::Error("Add part failed! code=%d",
return Status::Error("Check peers failed! code=%d",
static_cast<int32_t>(resp.get_code()));
}
});
Expand Down Expand Up @@ -327,12 +331,16 @@ folly::Future<Status> AdminClient::getResponse(
this] () mutable {
auto client = clientsMan_->client(host, evb);
remoteFunc(client, std::move(req)).via(evb)
.then([p = std::move(pro), partId, respGen = std::move(respGen)](
.then([p = std::move(pro), partId, respGen = std::move(respGen), host](
folly::Try<storage::cpp2::AdminExecResp>&& t) mutable {
// exception occurred during RPC
auto hostStr = network::NetworkUtils::intToIPv4(host.first);
if (t.hasException()) {
p.setValue(Status::Error(folly::stringPrintf("RPC failure in AdminClient: %s",
t.exception().what().c_str())));
p.setValue(Status::Error(folly::stringPrintf(
"[%s:%d] RPC failure in AdminClient: %s",
hostStr.c_str(),
host.second,
t.exception().what().c_str())));
return;
}
auto&& result = std::move(t).value().get_result();
Expand Down
14 changes: 12 additions & 2 deletions src/meta/processors/admin/BalanceTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,20 @@ void BalanceTask::invoke() {
}
switch (status_) {
case Status::START: {
LOG(INFO) << taskIdStr_ << "Start to move part!";
status_ = Status::CHANGE_LEADER;
LOG(INFO) << taskIdStr_ << "Start to move part, check the peers firstly!";
ret_ = Result::IN_PROGRESS;
startTimeMs_ = time::WallClock::fastNowInMilliSec();
SAVE_STATE();
client_->checkPeers(spaceId_, partId_).thenValue([this] (auto&& resp) {
if (!resp.ok()) {
LOG(INFO) << taskIdStr_ << "Check the peers failed, status " << resp;
ret_ = Result::FAILED;
} else {
status_ = Status::CHANGE_LEADER;
}
invoke();
});
break;
}
// fallthrough
case Status::CHANGE_LEADER: {
Expand Down

0 comments on commit eaeda4d

Please sign in to comment.