Skip to content

Commit

Permalink
Optimize the write performance when host is down (#5571)
Browse files Browse the repository at this point in the history
* Optimize the write performance when host is down

* fix the comments

---------

Co-authored-by: Sophie <[email protected]>
  • Loading branch information
luyade and Sophie-Xie committed Jul 20, 2023
1 parent 512d65f commit 3f953cd
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 7 deletions.
48 changes: 45 additions & 3 deletions src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ DEFINE_uint32(max_appendlog_batch_size,
"The max number of logs in each appendLog request batch");
DEFINE_uint32(max_outstanding_requests, 1024, "The max number of outstanding appendLog requests");
DEFINE_int32(raft_rpc_timeout_ms, 1000, "rpc timeout for raft client");
DEFINE_int32(pause_host_time_factor,
4,
"The factor of pause host time based on raft heartbeat interval");

DECLARE_bool(trace_raft);
DECLARE_uint32(raft_heartbeat_interval_secs);
Expand Down Expand Up @@ -60,11 +63,22 @@ nebula::cpp2::ErrorCode Host::canAppendLog() const {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode Host::canSendHBOrVote() const {
CHECK(!lock_.try_lock());
if (stopped_) {
VLOG(2) << idStr_ << "The host is stopped, just return";
return nebula::cpp2::ErrorCode::E_RAFT_HOST_STOPPED;
}

return nebula::cpp2::ErrorCode::SUCCEEDED;
}

folly::Future<cpp2::AskForVoteResponse> Host::askForVote(const cpp2::AskForVoteRequest& req,
folly::EventBase* eb) {
{
std::lock_guard<std::mutex> g(lock_);
if (stopped_) {
auto res = canSendHBOrVote();
if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(3) << idStr_ << "The Host is not in a proper status, do not send";
cpp2::AskForVoteResponse resp;
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_HOST_STOPPED;
Expand Down Expand Up @@ -410,11 +424,39 @@ folly::Future<cpp2::HeartbeatResponse> Host::sendHeartbeat(
pro = std::move(promise)](folly::Try<cpp2::HeartbeatResponse>&& t) mutable {
VLOG(4) << self->idStr_ << "heartbeat call got response";
if (t.hasException()) {
using TransportException = apache::thrift::transport::TTransportException;
auto exWrapper = std::move(t).exception();
auto exception = exWrapper.get_exception<TransportException>();
VLOG(2) << self->idStr_ << "Heartbeat: " << exception->what();
// If we keeps receiving NOT_OPEN exception after some HB intervals,
// we can assume that the peer is down so we mark paused_ as true
if (exception && exception->getType() == TransportException::NOT_OPEN) {
if (!self->paused_) {
auto now = time::WallClock::fastNowInMilliSec();
if (now - self->lastHeartbeatTime_ >=
FLAGS_pause_host_time_factor * FLAGS_raft_heartbeat_interval_secs * 1000) {
LOG(WARNING) << self->idStr_
<< "Pasue this host because long time no heartbeat response";
std::lock_guard<std::mutex> g(self->lock_);
self->paused_ = true;
}
}
}
cpp2::HeartbeatResponse resp;
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_RPC_EXCEPTION;
pro.setValue(std::move(resp));
return;
} else {
auto& resp = t.value();
if (resp.error_code_ref() == nebula::cpp2::ErrorCode::SUCCEEDED) {
std::lock_guard<std::mutex> g(self->lock_);
// If the peer is back online and ready, we set paused_ as false,
// the leader can then resume sending appendLog request to this peer
if (self->paused_) {
self->paused_ = false;
}
}
self->setLastHeartbeatTime(time::WallClock::fastNowInMilliSec());
pro.setValue(std::move(t.value()));
}
});
Expand All @@ -427,7 +469,7 @@ folly::Future<cpp2::HeartbeatResponse> Host::sendHeartbeatRequest(

{
std::lock_guard<std::mutex> g(lock_);
auto res = canAppendLog();
auto res = canSendHBOrVote();
if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(3) << idStr_ << "The Host is not in a proper status, do not send";
cpp2::HeartbeatResponse resp;
Expand Down Expand Up @@ -459,8 +501,8 @@ std::shared_ptr<cpp2::AppendLogRequest> Host::getPendingReqIfAny(std::shared_ptr

// Check if there are any pending request to send
if (self->noRequest()) {
self->noMoreRequestCV_.notify_all();
self->requestOnGoing_ = false;
self->noMoreRequestCV_.notify_all();
return nullptr;
}

Expand Down
13 changes: 13 additions & 0 deletions src/kvstore/raftex/Host.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ class Host final : public std::enable_shared_from_this<Host> {
*/
nebula::cpp2::ErrorCode canAppendLog() const;

/**
* @brief Whether Host can send HB or AskForVote request to the peer
*
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode canSendHBOrVote() const;

/**
* @brief Send append log rpc
*
Expand Down Expand Up @@ -244,6 +251,12 @@ class Host final : public std::enable_shared_from_this<Host> {

mutable std::mutex lock_;

// If stopped_ is true, we will not send any request to the peer;
// If stopped_ is false:
// 1. no mater whether paused_ is true or not, we can send HB request or AskForVote request;
// 2. Only if paused_ is false, we can send appendlog request, of course, including HB
// request and AskForRequest request
// See canAppendLog() and canSendHBOrVote()
bool paused_{false};
bool stopped_{false};

Expand Down
4 changes: 0 additions & 4 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2097,10 +2097,6 @@ void RaftPart::sendHeartbeat() {
if (!hosts[resp.first]->isLearner() &&
resp.second.get_error_code() == nebula::cpp2::ErrorCode::SUCCEEDED) {
++numSucceeded;
// only metad 0 space 0 part need this state now.
if (spaceId_ == kDefaultSpaceId) {
hosts[resp.first]->setLastHeartbeatTime(time::WallClock::fastNowInMilliSec());
}
}
highestTerm = std::max(highestTerm, resp.second.get_current_term());
}
Expand Down

0 comments on commit 3f953cd

Please sign in to comment.