Skip to content

Commit

Permalink
separate thenValue/thenError when rpc fail
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Oct 11, 2021
1 parent c7c1bf8 commit 7b4cc7d
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 119 deletions.
209 changes: 113 additions & 96 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClientBase<ClientType>::c
folly::EventBase* evb,
std::unordered_map<HostAddr, Request> requests,
RemoteFunc&& remoteFunc) {
using TransportException = apache::thrift::transport::TTransportException;
auto context = std::make_shared<ResponseContext<Request, RemoteFunc, Response>>(
requests.size(), std::move(remoteFunc));

Expand All @@ -138,49 +139,64 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClientBase<ClientType>::c
// Since all requests are sent using the same eventbase, all
// then-callback will be executed on the same IO thread
.via(evb)
.then([this, context, host, spaceId, start](folly::Try<Response>&& val) {
if (val.hasException()) {
auto& r = context->findRequest(host);
LOG(ERROR) << "Request to " << host << " failed: " << val.exception().what();
auto parts = getReqPartsId(r);
context->resp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE);
invalidLeader(spaceId, parts);
context->resp.markFailure();
} else {
auto resp = std::move(val.value());
auto& result = resp.get_result();
bool hasFailure{false};
for (auto& code : result.get_failed_parts()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code "
<< static_cast<int32_t>(code.get_code());
hasFailure = true;
context->resp.emplaceFailedPart(code.get_part_id(), code.get_code());
if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
auto* leader = code.get_leader();
if (isValidHostPtr(leader)) {
updateLeader(spaceId, code.get_part_id(), *leader);
} else {
invalidLeader(spaceId, code.get_part_id());
}
} else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND ||
code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
invalidLeader(spaceId, code.get_part_id());
.thenValue([this, context, host, spaceId, start](Response&& resp) {
auto& result = resp.get_result();
bool hasFailure{false};
for (auto& code : result.get_failed_parts()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code "
<< static_cast<int32_t>(code.get_code());
hasFailure = true;
context->resp.emplaceFailedPart(code.get_part_id(), code.get_code());
if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
auto* leader = code.get_leader();
if (isValidHostPtr(leader)) {
updateLeader(spaceId, code.get_part_id(), *leader);
} else {
// do nothing
invalidLeader(spaceId, code.get_part_id());
}
} else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND ||
code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
invalidLeader(spaceId, code.get_part_id());
} else {
// do nothing
}
if (hasFailure) {
context->resp.markFailure();
}

// Adjust the latency
auto latency = result.get_latency_in_us();
context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start);

// Keep the response
context->resp.addResponse(std::move(resp));
}
if (hasFailure) {
context->resp.markFailure();
}

// Adjust the latency
auto latency = result.get_latency_in_us();
context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start);

// Keep the response
context->resp.addResponse(std::move(resp));
})
.thenError(folly::tag_t<TransportException>{},
[this, context, host, spaceId](TransportException&& ex) {
auto& r = context->findRequest(host);
auto parts = getReqPartsId(r);
if (ex.getType() == TransportException::TIMED_OUT) {
LOG(ERROR) << "Request to " << host << " time out: " << ex.what();
} else {
invalidLeader(spaceId, parts);
LOG(ERROR) << "Request to " << host << " failed: " << ex.what();
}
context->resp.appendFailedParts(parts,
nebula::cpp2::ErrorCode::E_RPC_FAILURE);
context->resp.markFailure();
})
.thenError(folly::tag_t<std::exception>{},
[this, context, host, spaceId](std::exception&& ex) {
auto& r = context->findRequest(host);
auto parts = getReqPartsId(r);
LOG(ERROR) << "Request to " << host << " failed: " << ex.what();
invalidLeader(spaceId, parts);
context->resp.appendFailedParts(parts,
nebula::cpp2::ErrorCode::E_RPC_FAILURE);
context->resp.markFailure();
})
.ensure([context, host] {
if (context->removeRequest(host)) {
// Received all responses
context->promise.setValue(std::move(context->resp));
Expand All @@ -200,75 +216,76 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClientBase<ClientType>::c
template <typename ClientType>
template <class Request, class RemoteFunc, class Response>
folly::Future<StatusOr<Response>> StorageClientBase<ClientType>::getResponse(
folly::EventBase* evb,
std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<StatusOr<Response>> pro) {
auto f = pro.getFuture();
getResponseImpl(evb,
std::forward<decltype(request)>(request),
std::forward<RemoteFunc>(remoteFunc),
std::move(pro));
folly::EventBase* evb, std::pair<HostAddr, Request>&& request, RemoteFunc&& remoteFunc) {
auto pro = std::make_shared<folly::Promise<StatusOr<Response>>>();
auto f = pro->getFuture();
getResponseImpl(
evb, std::forward<decltype(request)>(request), std::forward<RemoteFunc>(remoteFunc), pro);
return f;
}

template <typename ClientType>
template <class Request, class RemoteFunc, class Response>
void StorageClientBase<ClientType>::getResponseImpl(folly::EventBase* evb,
std::pair<HostAddr, Request> request,
RemoteFunc remoteFunc,
folly::Promise<StatusOr<Response>> pro) {
void StorageClientBase<ClientType>::getResponseImpl(
folly::EventBase* evb,
std::pair<HostAddr, Request> request,
RemoteFunc remoteFunc,
std::shared_ptr<folly::Promise<StatusOr<Response>>> pro) {
using TransportException = apache::thrift::transport::TTransportException;
if (evb == nullptr) {
DCHECK(!!ioThreadPool_);
evb = ioThreadPool_->getEventBase();
}
folly::via(evb,
[evb,
request = std::move(request),
remoteFunc = std::move(remoteFunc),
pro = std::move(pro),
this]() mutable {
auto host = request.first;
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
auto spaceId = request.second.get_space_id();
auto partsId = getReqPartsId(request.second);
LOG(INFO) << "Send request to storage " << host;
remoteFunc(client.get(), request.second)
.via(evb)
.then([spaceId,
partsId = std::move(partsId),
p = std::move(pro),
request = std::move(request),
remoteFunc = std::move(remoteFunc),
this](folly::Try<Response>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
p.setValue(Status::Error(folly::stringPrintf(
"RPC failure in StorageClient: %s", t.exception().what().c_str())));
invalidLeader(spaceId, partsId);
return;
}
auto&& resp = std::move(t.value());
// leader changed
auto& result = resp.get_result();
for (auto& code : result.get_failed_parts()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code "
<< static_cast<int32_t>(code.get_code());
if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
auto* leader = code.get_leader();
if (isValidHostPtr(leader)) {
updateLeader(spaceId, code.get_part_id(), *leader);
folly::via(
evb,
[evb, request = std::move(request), remoteFunc = std::move(remoteFunc), pro, this]() mutable {
auto host = request.first;
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
auto spaceId = request.second.get_space_id();
auto partsId = getReqPartsId(request.second);
LOG(INFO) << "Send request to storage " << host;
remoteFunc(client.get(), request.second)
.via(evb)
.thenValue([spaceId, pro, this](Response&& resp) mutable {
auto& result = resp.get_result();
for (auto& code : result.get_failed_parts()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code "
<< static_cast<int32_t>(code.get_code());
if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
auto* leader = code.get_leader();
if (isValidHostPtr(leader)) {
updateLeader(spaceId, code.get_part_id(), *leader);
} else {
invalidLeader(spaceId, code.get_part_id());
}
} else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND ||
code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
invalidLeader(spaceId, code.get_part_id());
}
}
pro->setValue(std::move(resp));
})
.thenError(folly::tag_t<TransportException>{},
[spaceId, partsId = std::move(partsId), host, pro, this](
TransportException&& ex) mutable {
if (ex.getType() == TransportException::TIMED_OUT) {
LOG(ERROR) << "Request to " << host << " time out: " << ex.what();
} else {
invalidLeader(spaceId, code.get_part_id());
invalidLeader(spaceId, partsId);
LOG(ERROR) << "Request to " << host << " failed: " << ex.what();
}
} else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND ||
code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
invalidLeader(spaceId, code.get_part_id());
}
}
p.setValue(std::move(resp));
});
}); // via
pro->setValue(Status::Error(
folly::stringPrintf("RPC failure in StorageClient: %s", ex.what())));
})
.thenError(folly::tag_t<std::exception>{},
[spaceId, partsId = std::move(partsId), host, pro, this](
std::exception&& ex) mutable {
// exception occurred during RPC
pro->setValue(Status::Error(
folly::stringPrintf("RPC failure in StorageClient: %s", ex.what())));
invalidLeader(spaceId, partsId);
});
}); // via
}

template <typename ClientType>
Expand Down
10 changes: 4 additions & 6 deletions src/clients/storage/StorageClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,9 @@ class StorageClientBase {
class RemoteFunc,
class Response = typename std::result_of<RemoteFunc(ClientType* client,
const Request&)>::type::value_type>
folly::Future<StatusOr<Response>> getResponse(
folly::EventBase* evb,
std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<StatusOr<Response>> pro = folly::Promise<StatusOr<Response>>());
folly::Future<StatusOr<Response>> getResponse(folly::EventBase* evb,
std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc);

template <class Request,
class RemoteFunc,
Expand All @@ -155,7 +153,7 @@ class StorageClientBase {
void getResponseImpl(folly::EventBase* evb,
std::pair<HostAddr, Request> request,
RemoteFunc remoteFunc,
folly::Promise<StatusOr<Response>> pro);
std::shared_ptr<folly::Promise<StatusOr<Response>>> pro);

// Cluster given ids into the host they belong to
// The method returns a map
Expand Down
57 changes: 40 additions & 17 deletions src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,10 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) {
}

void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr<cpp2::AppendLogRequest> req) {
sendAppendLogRequest(eb, std::move(req))
using TransportException = apache::thrift::transport::TTransportException;
sendAppendLogRequest(eb, req)
.via(eb)
.then([eb, self = shared_from_this()](folly::Try<cpp2::AppendLogResponse>&& t) {
VLOG(3) << self->idStr_ << "appendLogs() call got response";
if (t.hasException()) {
VLOG(2) << self->idStr_ << t.exception().what();
cpp2::AppendLogResponse r;
r.set_error_code(cpp2::ErrorCode::E_EXCEPTION);
{
std::lock_guard<std::mutex> g(self->lock_);
self->setResponse(r);
self->lastLogIdSent_ = self->logIdToSend_ - 1;
}
self->noMoreRequestCV_.notify_all();
return;
}

cpp2::AppendLogResponse resp = std::move(t).value();
.thenValue([eb, self = shared_from_this()](cpp2::AppendLogResponse&& resp) {
LOG_IF(INFO, FLAGS_trace_raft)
<< self->idStr_ << "AppendLogResponse "
<< "code " << apache::thrift::util::enumNameSafe(resp.get_error_code()) << ", currTerm "
Expand Down Expand Up @@ -358,6 +344,43 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr<cpp2::Append
return;
}
}
})
.thenError(folly::tag_t<TransportException>{},
[self = shared_from_this(), req](TransportException&& ex) {
VLOG(2) << self->idStr_ << ex.what();
cpp2::AppendLogResponse r;
r.set_error_code(cpp2::ErrorCode::E_EXCEPTION);
{
std::lock_guard<std::mutex> g(self->lock_);
if (ex.getType() == TransportException::TIMED_OUT) {
VLOG(2) << self->idStr_ << "append log time out"
<< ", space " << req->get_space() << ", part " << req->get_part()
<< ", current term " << req->get_current_term() << ", last_log_id "
<< req->get_last_log_id() << ", committed_id "
<< req->get_committed_log_id() << ", last_log_term_sent"
<< req->get_last_log_term_sent() << ", last_log_id_sent "
<< req->get_last_log_id_sent() << ", logs size "
<< req->get_log_str_list().size();
}
self->setResponse(r);
self->lastLogIdSent_ = self->logIdToSend_ - 1;
}
// a new raft log or heartbeat will trigger another appendLogs in Host
self->noMoreRequestCV_.notify_all();
return;
})
.thenError(folly::tag_t<std::exception>{}, [self = shared_from_this()](std::exception&& ex) {
VLOG(2) << self->idStr_ << ex.what();
cpp2::AppendLogResponse r;
r.set_error_code(cpp2::ErrorCode::E_EXCEPTION);
{
std::lock_guard<std::mutex> g(self->lock_);
self->setResponse(r);
self->lastLogIdSent_ = self->logIdToSend_ - 1;
}
// a new raft log or heartbeat will trigger another appendLogs in Host
self->noMoreRequestCV_.notify_all();
return;
});
}

Expand Down

0 comments on commit 7b4cc7d

Please sign in to comment.