Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Raft] rollback related optimize #2903

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/nebula-storaged.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
--enable_rocksdb_prefix_filtering=false

############### misc ####################
--snapshot_part_rate_limit=8388608
--snapshot_part_rate_limit=10485760
--snapshot_batch_size=1048576
--rebuild_index_part_rate_limit=4194304
--rebuild_index_batch_size=1048576
92 changes: 54 additions & 38 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClientBase<ClientType>::c
folly::EventBase* evb,
std::unordered_map<HostAddr, Request> requests,
RemoteFunc&& remoteFunc) {
using TransportException = apache::thrift::transport::TTransportException;
auto context = std::make_shared<ResponseContext<Request, RemoteFunc, Response>>(
requests.size(), std::move(remoteFunc));

Expand All @@ -137,49 +138,64 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClientBase<ClientType>::c
// Since all requests are sent using the same eventbase, all
// then-callback will be executed on the same IO thread
.via(evb)
.then([this, context, host, spaceId, start](folly::Try<Response>&& val) {
auto& r = context->findRequest(host);
if (val.hasException()) {
LOG(ERROR) << "Request to " << host << " failed: " << val.exception().what();
auto parts = getReqPartsId(r);
context->resp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE);
invalidLeader(spaceId, parts);
context->resp.markFailure();
} else {
auto resp = std::move(val.value());
auto& result = resp.get_result();
bool hasFailure{false};
for (auto& code : result.get_failed_parts()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code "
<< static_cast<int32_t>(code.get_code());
hasFailure = true;
context->resp.emplaceFailedPart(code.get_part_id(), code.get_code());
if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
auto* leader = code.get_leader();
if (isValidHostPtr(leader)) {
updateLeader(spaceId, code.get_part_id(), *leader);
} else {
invalidLeader(spaceId, code.get_part_id());
}
} else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND ||
code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
invalidLeader(spaceId, code.get_part_id());
.thenValue([this, context, host, spaceId, start](Response&& resp) {
auto& result = resp.get_result();
bool hasFailure{false};
for (auto& code : result.get_failed_parts()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code "
<< static_cast<int32_t>(code.get_code());
hasFailure = true;
context->resp.emplaceFailedPart(code.get_part_id(), code.get_code());
if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
auto* leader = code.get_leader();
if (isValidHostPtr(leader)) {
updateLeader(spaceId, code.get_part_id(), *leader);
} else {
// do nothing
invalidLeader(spaceId, code.get_part_id());
}
} else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND ||
code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
invalidLeader(spaceId, code.get_part_id());
} else {
// do nothing
}
if (hasFailure) {
context->resp.markFailure();
}

// Adjust the latency
auto latency = result.get_latency_in_us();
context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start);

// Keep the response
context->resp.addResponse(std::move(resp));
}
if (hasFailure) {
context->resp.markFailure();
}

// Adjust the latency
auto latency = result.get_latency_in_us();
context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start);

// Keep the response
context->resp.addResponse(std::move(resp));
})
.thenError(folly::tag_t<TransportException>{},
[this, context, host, spaceId](TransportException&& ex) {
auto& r = context->findRequest(host);
auto parts = getReqPartsId(r);
if (ex.getType() == TransportException::TIMED_OUT) {
LOG(ERROR) << "Request to " << host << " time out: " << ex.what();
} else {
invalidLeader(spaceId, parts);
LOG(ERROR) << "Request to " << host << " failed: " << ex.what();
}
context->resp.appendFailedParts(parts,
nebula::cpp2::ErrorCode::E_RPC_FAILURE);
context->resp.markFailure();
})
.thenError(folly::tag_t<std::exception>{},
[this, context, host, spaceId](std::exception&& ex) {
auto& r = context->findRequest(host);
auto parts = getReqPartsId(r);
LOG(ERROR) << "Request to " << host << " failed: " << ex.what();
invalidLeader(spaceId, parts);
context->resp.appendFailedParts(parts,
nebula::cpp2::ErrorCode::E_RPC_FAILURE);
context->resp.markFailure();
})
.ensure([context, host] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good improvement! It's what i want to do like this. Maybe we can do more for all requests by folly::collect

if (context->removeRequest(host)) {
// Received all responses
context->promise.setValue(std::move(context->resp));
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include "kvstore/RateLimiter.h"

DEFINE_uint32(snapshot_part_rate_limit,
1024 * 1024 * 2,
1024 * 1024 * 10,
"max bytes of pulling snapshot for each partition in one second");
DEFINE_uint32(snapshot_batch_size, 1024 * 512, "batch size for snapshot, in bytes");

Expand All @@ -22,7 +22,7 @@ const int32_t kReserveNum = 1024 * 4;

NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) {
// Snapshot rate is limited to FLAGS_snapshot_worker_threads * FLAGS_snapshot_part_rate_limit.
// So by default, the total send rate is limited to 4 * 2Mb = 8Mb.
// So by default, the total send rate is limited to 4 * 10Mb = 40Mb.
LOG(INFO) << "Send snapshot is rate limited to " << FLAGS_snapshot_part_rate_limit
<< " for each part";
}
Expand Down
72 changes: 51 additions & 21 deletions src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ Host::Host(const HostAddr& addr, std::shared_ptr<RaftPart> part, bool isLearner)
isLearner_(isLearner),
idStr_(folly::stringPrintf(
"%s[Host: %s:%d] ", part_->idStr_.c_str(), addr_.host.c_str(), addr_.port)),
cachingPromise_(folly::SharedPromise<cpp2::AppendLogResponse>()) {}
cachingPromise_(folly::SharedPromise<cpp2::AppendLogResponse>()),
rpcTimeout_(FLAGS_raft_rpc_timeout_ms) {}

void Host::waitForStop() {
std::unique_lock<std::mutex> g(lock_);
Expand Down Expand Up @@ -156,31 +157,18 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) {
}

void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr<cpp2::AppendLogRequest> req) {
sendAppendLogRequest(eb, std::move(req))
using TransportException = apache::thrift::transport::TTransportException;
sendAppendLogRequest(eb, req)
.via(eb)
.then([eb, self = shared_from_this()](folly::Try<cpp2::AppendLogResponse>&& t) {
VLOG(3) << self->idStr_ << "appendLogs() call got response";
if (t.hasException()) {
VLOG(2) << self->idStr_ << t.exception().what();
cpp2::AppendLogResponse r;
r.set_error_code(cpp2::ErrorCode::E_EXCEPTION);
{
std::lock_guard<std::mutex> g(self->lock_);
self->setResponse(r);
self->lastLogIdSent_ = self->logIdToSend_ - 1;
}
self->noMoreRequestCV_.notify_all();
return;
}

cpp2::AppendLogResponse resp = std::move(t).value();
.thenValue([eb, self = shared_from_this()](cpp2::AppendLogResponse&& resp) {
LOG_IF(INFO, FLAGS_trace_raft)
<< self->idStr_ << "AppendLogResponse "
<< "code " << apache::thrift::util::enumNameSafe(resp.get_error_code()) << ", currTerm "
<< resp.get_current_term() << ", lastLogId " << resp.get_last_log_id()
<< ", lastLogTerm " << resp.get_last_log_term() << ", commitLogId "
<< resp.get_committed_log_id() << ", lastLogIdSent_ " << self->lastLogIdSent_
<< ", lastLogTermSent_ " << self->lastLogTermSent_;
self->rpcTimeout_ = FLAGS_raft_rpc_timeout_ms;
switch (resp.get_error_code()) {
case cpp2::ErrorCode::SUCCEEDED: {
VLOG(2) << self->idStr_ << "AppendLog request sent successfully";
Expand Down Expand Up @@ -358,6 +346,47 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr<cpp2::Append
return;
}
}
})
.thenError(folly::tag_t<TransportException>{},
[self = shared_from_this(), req](TransportException&& ex) {
VLOG(2) << self->idStr_ << ex.what();
cpp2::AppendLogResponse r;
r.set_error_code(cpp2::ErrorCode::E_EXCEPTION);
{
std::lock_guard<std::mutex> g(self->lock_);
if (ex.getType() == TransportException::TIMED_OUT) {
VLOG(2) << self->idStr_ << "append log time out"
<< ", space " << req->get_space() << ", part " << req->get_part()
<< ", current term " << req->get_current_term() << ", last_log_id "
<< req->get_last_log_id() << ", committed_id "
<< req->get_committed_log_id() << ", last_log_term_sent"
<< req->get_last_log_term_sent() << ", last_log_id_sent "
<< req->get_last_log_id_sent()
<< ", set lastLogIdSent_ to logIdToSend_ " << self->logIdToSend_
<< ", logs size " << req->get_log_str_list().size();
if ((self->rpcTimeout_ << 1) < FLAGS_raft_heartbeat_interval_secs * 1000) {
self->rpcTimeout_ <<= 1;
}
}
self->setResponse(r);
self->lastLogIdSent_ = self->logIdToSend_ - 1;
}
// a new raft log or heartbeat will trigger another appendLogs in Host
self->noMoreRequestCV_.notify_all();
return;
})
.thenError(folly::tag_t<std::exception>{}, [self = shared_from_this()](std::exception&& ex) {
VLOG(2) << self->idStr_ << ex.what();
cpp2::AppendLogResponse r;
r.set_error_code(cpp2::ErrorCode::E_EXCEPTION);
{
std::lock_guard<std::mutex> g(self->lock_);
self->setResponse(r);
self->lastLogIdSent_ = self->logIdToSend_ - 1;
}
// a new raft log or heartbeat will trigger another appendLogs in Host
self->noMoreRequestCV_.notify_all();
return;
});
}

Expand Down Expand Up @@ -444,11 +473,12 @@ folly::Future<cpp2::AppendLogResponse> Host::sendAppendLogRequest(
<< ", part " << req->get_part() << ", current term "
<< req->get_current_term() << ", last_log_id "
<< req->get_last_log_id() << ", committed_id "
<< req->get_committed_log_id() << ", last_log_term_sent"
<< req->get_committed_log_id() << ", last_log_term_sent "
<< req->get_last_log_term_sent() << ", last_log_id_sent "
<< req->get_last_log_id_sent();
<< req->get_last_log_id_sent() << ", logs in request "
<< req->get_log_str_list().size();
// Get client connection
auto client = part_->clientMan_->client(addr_, eb, false, FLAGS_raft_rpc_timeout_ms);
auto client = part_->clientMan_->client(addr_, eb, false, rpcTimeout_);
return client->future_appendLog(*req);
}

Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/raftex/Host.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class Host final : public std::enable_shared_from_this<Host> {

// CommittedLogId of follower
LogID followerCommittedLogId_{0};

uint32_t rpcTimeout_;
};

} // namespace raftex
Expand Down
Loading