diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index 65eb546b381..f1ba341a011 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -116,6 +116,7 @@ folly::SemiFuture> StorageClientBase::c folly::EventBase* evb, std::unordered_map requests, RemoteFunc&& remoteFunc) { + using TransportException = apache::thrift::transport::TTransportException; auto context = std::make_shared>( requests.size(), std::move(remoteFunc)); @@ -138,49 +139,64 @@ folly::SemiFuture> StorageClientBase::c // Since all requests are sent using the same eventbase, all // then-callback will be executed on the same IO thread .via(evb) - .then([this, context, host, spaceId, start](folly::Try&& val) { - if (val.hasException()) { - auto& r = context->findRequest(host); - LOG(ERROR) << "Request to " << host << " failed: " << val.exception().what(); - auto parts = getReqPartsId(r); - context->resp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE); - invalidLeader(spaceId, parts); - context->resp.markFailure(); - } else { - auto resp = std::move(val.value()); - auto& result = resp.get_result(); - bool hasFailure{false}; - for (auto& code : result.get_failed_parts()) { - VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code " - << static_cast(code.get_code()); - hasFailure = true; - context->resp.emplaceFailedPart(code.get_part_id(), code.get_code()); - if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - auto* leader = code.get_leader(); - if (isValidHostPtr(leader)) { - updateLeader(spaceId, code.get_part_id(), *leader); - } else { - invalidLeader(spaceId, code.get_part_id()); - } - } else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND || - code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { - invalidLeader(spaceId, code.get_part_id()); + .thenValue([this, context, host, spaceId, start](Response&& resp) { + auto& result = resp.get_result(); + bool hasFailure{false}; + for (auto& code : result.get_failed_parts()) { + VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code " + << static_cast(code.get_code()); + hasFailure = true; + context->resp.emplaceFailedPart(code.get_part_id(), code.get_code()); + if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { + auto* leader = code.get_leader(); + if (isValidHostPtr(leader)) { + updateLeader(spaceId, code.get_part_id(), *leader); } else { - // do nothing + invalidLeader(spaceId, code.get_part_id()); } + } else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND || + code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { + invalidLeader(spaceId, code.get_part_id()); + } else { + // do nothing } - if (hasFailure) { - context->resp.markFailure(); - } - - // Adjust the latency - auto latency = result.get_latency_in_us(); - context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start); - - // Keep the response - context->resp.addResponse(std::move(resp)); + } + if (hasFailure) { + context->resp.markFailure(); } + // Adjust the latency + auto latency = result.get_latency_in_us(); + context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start); + + // Keep the response + context->resp.addResponse(std::move(resp)); + }) + .thenError(folly::tag_t{}, + [this, context, host, spaceId](TransportException&& ex) { + auto& r = context->findRequest(host); + auto parts = getReqPartsId(r); + if (ex.getType() == TransportException::TIMED_OUT) { + LOG(ERROR) << "Request to " << host << " time out: " << ex.what(); + } else { + invalidLeader(spaceId, parts); + LOG(ERROR) << "Request to " << host << " failed: " << ex.what(); + } + context->resp.appendFailedParts(parts, + nebula::cpp2::ErrorCode::E_RPC_FAILURE); + context->resp.markFailure(); + }) + .thenError(folly::tag_t{}, + [this, context, host, spaceId](std::exception&& ex) { + auto& r = context->findRequest(host); + auto parts = getReqPartsId(r); + LOG(ERROR) << "Request to " << host << " failed: " << ex.what(); + invalidLeader(spaceId, parts); + context->resp.appendFailedParts(parts, + nebula::cpp2::ErrorCode::E_RPC_FAILURE); + context->resp.markFailure(); + }) + .ensure([context, host] { if (context->removeRequest(host)) { // Received all responses context->promise.setValue(std::move(context->resp)); @@ -200,75 +216,76 @@ folly::SemiFuture> StorageClientBase::c template template folly::Future> StorageClientBase::getResponse( - folly::EventBase* evb, - std::pair&& request, - RemoteFunc&& remoteFunc, - folly::Promise> pro) { - auto f = pro.getFuture(); - getResponseImpl(evb, - std::forward(request), - std::forward(remoteFunc), - std::move(pro)); + folly::EventBase* evb, std::pair&& request, RemoteFunc&& remoteFunc) { + auto pro = std::make_shared>>(); + auto f = pro->getFuture(); + getResponseImpl( + evb, std::forward(request), std::forward(remoteFunc), pro); return f; } template template -void StorageClientBase::getResponseImpl(folly::EventBase* evb, - std::pair request, - RemoteFunc remoteFunc, - folly::Promise> pro) { +void StorageClientBase::getResponseImpl( + folly::EventBase* evb, + std::pair request, + RemoteFunc remoteFunc, + std::shared_ptr>> pro) { + using TransportException = apache::thrift::transport::TTransportException; if (evb == nullptr) { DCHECK(!!ioThreadPool_); evb = ioThreadPool_->getEventBase(); } - folly::via(evb, - [evb, - request = std::move(request), - remoteFunc = std::move(remoteFunc), - pro = std::move(pro), - this]() mutable { - auto host = request.first; - auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms); - auto spaceId = request.second.get_space_id(); - auto partsId = getReqPartsId(request.second); - LOG(INFO) << "Send request to storage " << host; - remoteFunc(client.get(), request.second) - .via(evb) - .then([spaceId, - partsId = std::move(partsId), - p = std::move(pro), - request = std::move(request), - remoteFunc = std::move(remoteFunc), - this](folly::Try&& t) mutable { - // exception occurred during RPC - if (t.hasException()) { - p.setValue(Status::Error(folly::stringPrintf( - "RPC failure in StorageClient: %s", t.exception().what().c_str()))); - invalidLeader(spaceId, partsId); - return; - } - auto&& resp = std::move(t.value()); - // leader changed - auto& result = resp.get_result(); - for (auto& code : result.get_failed_parts()) { - VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code " - << static_cast(code.get_code()); - if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - auto* leader = code.get_leader(); - if (isValidHostPtr(leader)) { - updateLeader(spaceId, code.get_part_id(), *leader); + folly::via( + evb, + [evb, request = std::move(request), remoteFunc = std::move(remoteFunc), pro, this]() mutable { + auto host = request.first; + auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms); + auto spaceId = request.second.get_space_id(); + auto partsId = getReqPartsId(request.second); + LOG(INFO) << "Send request to storage " << host; + remoteFunc(client.get(), request.second) + .via(evb) + .thenValue([spaceId, pro, this](Response&& resp) mutable { + auto& result = resp.get_result(); + for (auto& code : result.get_failed_parts()) { + VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code " + << static_cast(code.get_code()); + if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { + auto* leader = code.get_leader(); + if (isValidHostPtr(leader)) { + updateLeader(spaceId, code.get_part_id(), *leader); + } else { + invalidLeader(spaceId, code.get_part_id()); + } + } else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND || + code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { + invalidLeader(spaceId, code.get_part_id()); + } + } + pro->setValue(std::move(resp)); + }) + .thenError(folly::tag_t{}, + [spaceId, partsId = std::move(partsId), host, pro, this]( + TransportException&& ex) mutable { + if (ex.getType() == TransportException::TIMED_OUT) { + LOG(ERROR) << "Request to " << host << " time out: " << ex.what(); } else { - invalidLeader(spaceId, code.get_part_id()); + invalidLeader(spaceId, partsId); + LOG(ERROR) << "Request to " << host << " failed: " << ex.what(); } - } else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND || - code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { - invalidLeader(spaceId, code.get_part_id()); - } - } - p.setValue(std::move(resp)); - }); - }); // via + pro->setValue(Status::Error( + folly::stringPrintf("RPC failure in StorageClient: %s", ex.what()))); + }) + .thenError(folly::tag_t{}, + [spaceId, partsId = std::move(partsId), host, pro, this]( + std::exception&& ex) mutable { + // exception occurred during RPC + pro->setValue(Status::Error( + folly::stringPrintf("RPC failure in StorageClient: %s", ex.what()))); + invalidLeader(spaceId, partsId); + }); + }); // via } template diff --git a/src/clients/storage/StorageClientBase.h b/src/clients/storage/StorageClientBase.h index 2a2538a3a3e..5ee928af24b 100644 --- a/src/clients/storage/StorageClientBase.h +++ b/src/clients/storage/StorageClientBase.h @@ -142,11 +142,9 @@ class StorageClientBase { class RemoteFunc, class Response = typename std::result_of::type::value_type> - folly::Future> getResponse( - folly::EventBase* evb, - std::pair&& request, - RemoteFunc&& remoteFunc, - folly::Promise> pro = folly::Promise>()); + folly::Future> getResponse(folly::EventBase* evb, + std::pair&& request, + RemoteFunc&& remoteFunc); template request, RemoteFunc remoteFunc, - folly::Promise> pro); + std::shared_ptr>> pro); // Cluster given ids into the host they belong to // The method returns a map diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 5f42bb27bbe..c882f67f91e 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -156,24 +156,10 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) { } void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr req) { - sendAppendLogRequest(eb, std::move(req)) + using TransportException = apache::thrift::transport::TTransportException; + sendAppendLogRequest(eb, req) .via(eb) - .then([eb, self = shared_from_this()](folly::Try&& t) { - VLOG(3) << self->idStr_ << "appendLogs() call got response"; - if (t.hasException()) { - VLOG(2) << self->idStr_ << t.exception().what(); - cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::E_EXCEPTION); - { - std::lock_guard g(self->lock_); - self->setResponse(r); - self->lastLogIdSent_ = self->logIdToSend_ - 1; - } - self->noMoreRequestCV_.notify_all(); - return; - } - - cpp2::AppendLogResponse resp = std::move(t).value(); + .thenValue([eb, self = shared_from_this()](cpp2::AppendLogResponse&& resp) { LOG_IF(INFO, FLAGS_trace_raft) << self->idStr_ << "AppendLogResponse " << "code " << apache::thrift::util::enumNameSafe(resp.get_error_code()) << ", currTerm " @@ -358,6 +344,43 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr{}, + [self = shared_from_this(), req](TransportException&& ex) { + VLOG(2) << self->idStr_ << ex.what(); + cpp2::AppendLogResponse r; + r.set_error_code(cpp2::ErrorCode::E_EXCEPTION); + { + std::lock_guard g(self->lock_); + if (ex.getType() == TransportException::TIMED_OUT) { + VLOG(2) << self->idStr_ << "append log time out" + << ", space " << req->get_space() << ", part " << req->get_part() + << ", current term " << req->get_current_term() << ", last_log_id " + << req->get_last_log_id() << ", committed_id " + << req->get_committed_log_id() << ", last_log_term_sent" + << req->get_last_log_term_sent() << ", last_log_id_sent " + << req->get_last_log_id_sent() << ", logs size " + << req->get_log_str_list().size(); + } + self->setResponse(r); + self->lastLogIdSent_ = self->logIdToSend_ - 1; + } + // a new raft log or heartbeat will trigger another appendLogs in Host + self->noMoreRequestCV_.notify_all(); + return; + }) + .thenError(folly::tag_t{}, [self = shared_from_this()](std::exception&& ex) { + VLOG(2) << self->idStr_ << ex.what(); + cpp2::AppendLogResponse r; + r.set_error_code(cpp2::ErrorCode::E_EXCEPTION); + { + std::lock_guard g(self->lock_); + self->setResponse(r); + self->lastLogIdSent_ = self->logIdToSend_ - 1; + } + // a new raft log or heartbeat will trigger another appendLogs in Host + self->noMoreRequestCV_.notify_all(); + return; }); }