Skip to content

Commit

Permalink
Refactor AdminClient getResponseFromLeader
Browse files Browse the repository at this point in the history
fuck this function
  • Loading branch information
yixinglu committed Mar 7, 2022
1 parent 631a7b3 commit b092688
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 99 deletions.
144 changes: 53 additions & 91 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <folly/futures/Future-pre.h>
#include <thrift/lib/cpp/util/EnumUtils.h>

#include <iterator>
#include <type_traits>

#include "common/utils/MetaKeyUtils.h"
Expand Down Expand Up @@ -125,17 +126,13 @@ folly::Future<Status> AdminClient::addLearner(GraphSpaceID spaceId,
}

auto peers = std::move(nebula::value(partHosts));
folly::Promise<Status> pro;
auto f = pro.getFuture();
getResponseFromLeader(
return getResponseFromLeader(
getAdminAddrFromPeers(peers),
0,
std::move(req),
[](auto client, auto request) { return client->semifuture_addLearner(request); },
0,
std::move(pro),
FLAGS_max_retry_times_admin_op);
return f;
}

folly::Future<Status> AdminClient::waitingForCatchUpData(GraphSpaceID spaceId,
Expand All @@ -153,17 +150,13 @@ folly::Future<Status> AdminClient::waitingForCatchUpData(GraphSpaceID spaceId,
}

auto peers = std::move(nebula::value(partHosts));
folly::Promise<Status> pro;
auto f = pro.getFuture();
getResponseFromLeader(
return getResponseFromLeader(
getAdminAddrFromPeers(peers),
0,
std::move(req),
[](auto client, auto request) { return client->semifuture_waitingForCatchUpData(request); },
0,
std::move(pro),
FLAGS_max_retry_times_admin_op);
return f;
}

folly::Future<Status> AdminClient::memberChange(GraphSpaceID spaceId,
Expand All @@ -183,17 +176,13 @@ folly::Future<Status> AdminClient::memberChange(GraphSpaceID spaceId,
}

auto peers = std::move(nebula::value(partHosts));
folly::Promise<Status> pro;
auto f = pro.getFuture();
getResponseFromLeader(
return getResponseFromLeader(
getAdminAddrFromPeers(peers),
0,
std::move(req),
[](auto client, auto request) { return client->semifuture_memberChange(request); },
0,
std::move(pro),
FLAGS_max_retry_times_admin_op);
return f;
}

folly::Future<Status> AdminClient::updateMeta(GraphSpaceID spaceId,
Expand Down Expand Up @@ -402,54 +391,29 @@ folly::Future<StatusOr<Response>> AdminClient::getResponseFromHost(const HostAdd
}

template <typename Request, typename RemoteFunc>
void AdminClient::getResponseFromLeader(std::vector<HostAddr> hosts,
int32_t index,
Request req,
RemoteFunc remoteFunc,
int32_t retry,
folly::Promise<Status> pro,
int32_t retryLimit) {
folly::Future<Status> AdminClient::getResponseFromLeader(std::vector<HostAddr> hosts,
int32_t index,
Request req,
RemoteFunc remoteFunc,
int32_t retry,
int32_t retryLimit) {
static_assert(
folly::isSemiFuture<
typename std::result_of<RemoteFunc(std::shared_ptr<ClientType>, Request)>::type>::value);
using RetryReturnType = std::tuple<Status, int32_t, std::vector<HostAddr>>;

auto* evb = ioThreadPool_->getEventBase();
CHECK_GE(index, 0);
CHECK_LT(index, hosts.size());
auto client = clientsMan_->client(hosts[index], evb);
static const std::string kNeedToRetryMsg = "__NEED_TO_RETRY__";
remoteFunc(client, req)
.via(evb)
.then([p = std::move(pro),
hosts = std::move(hosts),
index,
req = std::move(req),
remoteFunc = std::move(remoteFunc),
retry,
retryLimit,
this](folly::Try<storage::cpp2::AdminExecResp>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
if (retry < retryLimit) {
LOG(INFO) << "Rpc failure to " << hosts[index] << ", retry " << retry << ", limit "
<< retryLimit << ", error: " << t.exception();
index = (index + 1) % hosts.size();
getResponseFromLeader(std::move(hosts),
index,
std::move(req),
std::move(remoteFunc),
retry + 1,
std::move(p),
retryLimit);
return;
}
p.setValue(Status::Error("RPC failure in AdminClient: %s", t.exception().what().c_str()));
return;
}
auto&& adminResp = std::move(t.value());
.thenValue([hosts, index, retry, retryLimit](
storage::cpp2::AdminExecResp&& adminResp) mutable -> RetryReturnType {
if (adminResp.result.get_failed_parts().empty()) {
// succeeded
p.setValue(Status::OK());
return;
return std::make_tuple(Status::OK(), index, std::move(hosts));
}
auto resp = adminResp.result.get_failed_parts().front();
switch (resp.get_code()) {
Expand All @@ -464,68 +428,66 @@ void AdminClient::getResponseFromLeader(std::vector<HostAddr> hosts,
usleep(1000 * 50);
LOG(INFO) << "The leader is in election, retry " << retry << ", limit "
<< retryLimit;
index = (index + 1) % hosts.size();
getResponseFromLeader(std::move(hosts),
index,
std::move(req),
std::move(remoteFunc),
retry + 1,
std::move(p),
retryLimit);
return;
return std::make_tuple(
Status::Error(kNeedToRetryMsg), (index + 1) % hosts.size(), std::move(hosts));
}
// convert to admin addr
leader = Utils::getAdminAddrFromStoreAddr(leader);
int32_t leaderIndex = 0;
for (auto& h : hosts) {
if (h == leader) {
break;
}
leaderIndex++;
}
if (leaderIndex == (int32_t)hosts.size()) {
auto iter = std::find(hosts.begin(), hosts.end(), leader);
int32_t leaderIndex = std::distance(hosts.begin(), iter);
if (iter == hosts.end()) {
// In some cases (e.g. balance task is failed in member
// change remove phase, and the new host is elected as
// leader somehow), the peers of this partition in meta
// doesn't include new host, which will make this task
// failed forever.
index = leaderIndex;
hosts.emplace_back(leader);
}
LOG(INFO) << "Return leader change from " << hosts[index] << ", new leader is "
<< leader << ", retry " << retry << ", limit " << retryLimit;
CHECK_LT(leaderIndex, hosts.size());
getResponseFromLeader(std::move(hosts),
leaderIndex,
std::move(req),
std::move(remoteFunc),
retry + 1,
std::move(p),
retryLimit);
return;
return std::make_tuple(Status::Error(kNeedToRetryMsg), leaderIndex, std::move(hosts));
}
p.setValue(Status::Error("Leader changed!"));
return;
return std::make_tuple(Status::Error("Leader changed!"), index, std::move(hosts));
}
default: {
if (retry < retryLimit) {
LOG(INFO) << "Unknown code " << static_cast<int32_t>(resp.get_code()) << " from "
<< hosts[index] << ", retry " << retry << ", limit " << retryLimit;
index = (index + 1) % hosts.size();
getResponseFromLeader(std::move(hosts),
index,
std::move(req),
std::move(remoteFunc),
retry + 1,
std::move(p),
retryLimit);
return;
return std::make_tuple(
Status::Error(kNeedToRetryMsg), (index + 1) % hosts.size(), std::move(hosts));
}
p.setValue(Status::Error("Unknown code %d", static_cast<int32_t>(resp.get_code())));
return;
return std::make_tuple(
Status::Error("Unknown code %d", static_cast<int32_t>(resp.get_code())),
index,
std::move(hosts));
}
}
}); // then
})
.thenError([hosts, index, retry, retryLimit](exception_wrapper&& ex) -> RetryReturnType {
// exception occurred during RPC
if (retry < retryLimit) {
LOG(INFO) << "Rpc failure to " << hosts[index] << ", retry " << retry << ", limit "
<< retryLimit << ", error: " << ex.get_exception()->what();
return std::make_tuple(
Status::Error(kNeedToRetryMsg), (index + 1) % hosts.size(), std::move(hosts));
}
return std::make_tuple(
Status::Error("RPC failure in AdminClient: %s", ex.get_exception()->what()),
index,
std::move(hosts));
})
.thenValue([req = std::move(req), remoteFunc = std::move(remoteFunc), retry, retryLimit](
RetryReturnType&& tup) {
auto status = std::get<0>(tup);
if (status.ok() || status.message().find(kNeedToRetryMsg) == std::string::npos) {
return status;
}
auto idx = std::get<1>(tup);
auto hosts = std::get<2>(tup);
return getResponseFromLeader(
std::move(hosts), idx, std::move(req), std::move(remoteFunc), retry + 1, retryLimit);
});
}

// todo(doodle): add related locks
Expand Down
15 changes: 7 additions & 8 deletions src/meta/processors/admin/AdminClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,16 @@ class AdminClient {
* @param req RPC request
* @param remoteFunc Client's RPC function
* @param retry Current retry times
* @param pro Promise of result
* @param retryLimit Max retry times
* @return folly::Future<Status>
*/
template <typename Request, typename RemoteFunc>
void getResponseFromLeader(std::vector<HostAddr> hosts,
int32_t index,
Request req,
RemoteFunc remoteFunc,
int32_t retry,
folly::Promise<Status> pro,
int32_t retryLimit);
folly::Future<Status> getResponseFromLeader(std::vector<HostAddr> hosts,
int32_t index,
Request req,
RemoteFunc remoteFunc,
int32_t retry,
int32_t retryLimit);

folly::Future<StatusOr<std::unordered_map<GraphSpaceID, std::vector<PartitionID>>>> getLeaderDist(
const HostAddr& host);
Expand Down

0 comments on commit b092688

Please sign in to comment.