-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Notify user code about follower loss #517
Changes from 1 commit
ab777fc
e0400a5
b2d0b07
e45952a
480b1ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -302,6 +302,10 @@ public: | |
ptr<req_msg> get_rsv_msg() const { return rsv_msg_; } | ||
rpc_handler get_rsv_msg_handler() const { return rsv_msg_handler_; } | ||
|
||
bool IsLost() const { return lost_by_leader_; } | ||
void SetLost() { lost_by_leader_ = true; } | ||
void SetRecovered() { lost_by_leader_ = false; } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please follow the naming convention: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
|
||
private: | ||
void handle_rpc_result(ptr<peer> myself, | ||
ptr<rpc_client> my_rpc_client, | ||
|
@@ -498,6 +502,8 @@ private: | |
*/ | ||
std::atomic<bool> abandoned_; | ||
|
||
std::atomic<bool> lost_by_leader_ {false}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add comment about this member variable. Also, instead of brace initializer here, please put the default value into constructor, to be aligned with others. |
||
|
||
/** | ||
* Reserved message that should be sent next time. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -608,7 +608,25 @@ int32 raft_server::get_leadership_expiry() { | |
return expiry; | ||
} | ||
|
||
size_t raft_server::get_not_responding_peers() { | ||
std::vector<ptr<peer>> raft_server::get_not_responding_peers() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems to me There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, changed to |
||
ptr<raft_params> params = ctx_->get_params(); | ||
const auto expiry = | ||
params->heart_beat_interval_ * raft_server::raft_limits_.response_limit_; | ||
std::vector<ptr<peer>> rs; | ||
for (const auto& [id, peer_ptr]: peers_) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a c++17 feature, and this project is based on c++11. Please use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, my bad, sorry. Fixed. |
||
if (!is_regular_member(peer_ptr)) { | ||
continue; | ||
} | ||
const auto resp_elapsed_ms = | ||
static_cast<int32>(peer_ptr->get_resp_timer_us() / 1000); | ||
if (resp_elapsed_ms > expiry) { | ||
rs.push_back(peer_ptr); | ||
} | ||
} | ||
return rs; | ||
} | ||
|
||
size_t raft_server::get_not_responding_peers_count() { | ||
// Check if quorum nodes are not responding | ||
// (i.e., don't respond 20x heartbeat time long). | ||
size_t num_not_resp_nodes = 0; | ||
|
@@ -804,6 +822,14 @@ void raft_server::handle_peer_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err) | |
} else if (rpc_errs == raft_server::raft_limits_.warning_limit_) { | ||
p_wn("too verbose RPC error on peer (%d), " | ||
"will suppress it from now", peer_id); | ||
if (!pp || !pp->IsLost()) { | ||
if (pp) { | ||
pp->SetLost(); | ||
} | ||
cb_func::Param param(id_, leader_, peer_id); | ||
const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, ¶m); | ||
assert(rc == cb_func::ReturnCode::Ok); | ||
} | ||
} | ||
|
||
if (pp && pp->is_leave_flag_set()) { | ||
|
@@ -837,6 +863,9 @@ void raft_server::handle_peer_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err) | |
peer* pp = entry->second.get(); | ||
int rpc_errs = pp->get_rpc_errs(); | ||
if (rpc_errs >= raft_server::raft_limits_.warning_limit_) { | ||
if (pp) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the flow enters here only when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't guarantee that a shared pointer has a value, does it? But I see that you don't check this at the line above ( |
||
pp->SetRecovered(); | ||
} | ||
p_wn("recovered from RPC failure from peer %d, %d errors", | ||
resp->get_src(), rpc_errs); | ||
} | ||
|
@@ -1085,7 +1114,7 @@ bool raft_server::check_leadership_validity() { | |
int32 num_voting_members = get_num_voting_members(); | ||
|
||
int leadership_expiry = get_leadership_expiry(); | ||
int32 nr_peers = (int32)get_not_responding_peers(); | ||
int32 nr_peers = (int32)get_not_responding_peers_count(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since const auto nr_peers_list = get_not_responding_peers();
int32 nr_peers = nr_peers_list.size(); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regarding this point:
But if you are ok with this overhead, plese let me know and I will change the code Also, please pay attention, I have changed the code of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, your change looks good. |
||
if (leadership_expiry < 0) { | ||
// Negative expiry: leadership will never expire. | ||
nr_peers = 0; | ||
|
@@ -1102,6 +1131,18 @@ bool raft_server::check_leadership_validity() { | |
get_leadership_expiry(), | ||
min_quorum_size); | ||
|
||
const auto nr_peers_list = get_not_responding_peers(); | ||
assert(nr_peers_list.size() == nr_peers); | ||
for (auto& peer : nr_peers_list) { | ||
if (peer->IsLost()) { | ||
continue; | ||
} | ||
peer->SetLost(); | ||
cb_func::Param param(id_, leader_, peer->get_id()); | ||
const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, ¶m); | ||
assert(rc == cb_func::ReturnCode::Ok); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use 4-space indent. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
|
||
// NOTE: | ||
// For a cluster where the number of members is the same | ||
// as the size of quorum, we should not expire leadership, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no other special reason, please use
28
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed