From ab777fc8f3b3ae68390d5e29b239c9a89fc04043 Mon Sep 17 00:00:00 2001 From: Pavel Yurin Date: Tue, 18 Jun 2024 12:27:02 +0800 Subject: [PATCH 1/4] Notify user code about follower loss --- include/libnuraft/callback.hxx | 7 +++++ include/libnuraft/peer.hxx | 6 +++++ include/libnuraft/raft_server.hxx | 3 ++- src/handle_append_entries.cxx | 4 +-- src/raft_server.cxx | 45 +++++++++++++++++++++++++++++-- 5 files changed, 60 insertions(+), 5 deletions(-) diff --git a/include/libnuraft/callback.hxx b/include/libnuraft/callback.hxx index d75b255b..b03777c2 100644 --- a/include/libnuraft/callback.hxx +++ b/include/libnuraft/callback.hxx @@ -207,6 +207,13 @@ public: */ ResignationFromLeader = 27, + /** + * When a peer RPC errors count exceeds raft_server::limits.warning_limit_, or + * a peer doesn't respond for a long time (raft_params::leadership_expiry_), + * the peer is considered lost. + * ctx: null. + */ + FollowerLost = 100, }; struct Param { diff --git a/include/libnuraft/peer.hxx b/include/libnuraft/peer.hxx index 895d313a..27c46f2f 100644 --- a/include/libnuraft/peer.hxx +++ b/include/libnuraft/peer.hxx @@ -302,6 +302,10 @@ public: ptr get_rsv_msg() const { return rsv_msg_; } rpc_handler get_rsv_msg_handler() const { return rsv_msg_handler_; } + bool IsLost() const { return lost_by_leader_; } + void SetLost() { lost_by_leader_ = true; } + void SetRecovered() { lost_by_leader_ = false; } + private: void handle_rpc_result(ptr myself, ptr my_rpc_client, @@ -498,6 +502,8 @@ private: */ std::atomic abandoned_; + std::atomic lost_by_leader_ {false}; + /** * Reserved message that should be sent next time. */ diff --git a/include/libnuraft/raft_server.hxx b/include/libnuraft/raft_server.hxx index 70faf49d..957de6f3 100644 --- a/include/libnuraft/raft_server.hxx +++ b/include/libnuraft/raft_server.hxx @@ -886,7 +886,8 @@ protected: int32 get_quorum_for_election(); int32 get_quorum_for_commit(); int32 get_leadership_expiry(); - size_t get_not_responding_peers(); + std::vector> get_not_responding_peers(); + size_t get_not_responding_peers_count(); size_t get_num_stale_peers(); ptr handle_append_entries(req_msg& req); diff --git a/src/handle_append_entries.cxx b/src/handle_append_entries.cxx index 0318487b..00461e00 100644 --- a/src/handle_append_entries.cxx +++ b/src/handle_append_entries.cxx @@ -162,7 +162,7 @@ bool raft_server::request_append_entries(ptr p) { chk_timer.timeout_and_reset() ) { // If auto adjust mode is on for 2-node cluster, and // the follower is not responding, adjust the quorum. - size_t num_not_responding_peers = get_not_responding_peers(); + size_t num_not_responding_peers = get_not_responding_peers_count(); size_t cur_quorum_size = get_quorum_for_commit(); size_t num_stale_peers = get_num_stale_peers(); if (cur_quorum_size >= 1) { @@ -1191,7 +1191,7 @@ ulong raft_server::get_expected_committed_log_idx() { size_t quorum_idx = get_quorum_for_commit(); if (ctx_->get_params()->use_full_consensus_among_healthy_members_) { - size_t not_responding_peers = get_not_responding_peers(); + size_t not_responding_peers = get_not_responding_peers_count(); if (not_responding_peers < voting_members - quorum_idx) { // If full consensus option is on, commit should be // agreed by all healthy members, and the number of diff --git a/src/raft_server.cxx b/src/raft_server.cxx index e331a319..37de15c3 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -608,7 +608,25 @@ int32 raft_server::get_leadership_expiry() { return expiry; } -size_t raft_server::get_not_responding_peers() { +std::vector> raft_server::get_not_responding_peers() { + ptr params = ctx_->get_params(); + const auto expiry = + params->heart_beat_interval_ * raft_server::raft_limits_.response_limit_; + std::vector> rs; + for (const auto& [id, peer_ptr]: peers_) { + if (!is_regular_member(peer_ptr)) { + continue; + } + const auto resp_elapsed_ms = + static_cast(peer_ptr->get_resp_timer_us() / 1000); + if (resp_elapsed_ms > expiry) { + rs.push_back(peer_ptr); + } + } + return rs; +} + +size_t raft_server::get_not_responding_peers_count() { // Check if quorum nodes are not responding // (i.e., don't respond 20x heartbeat time long). size_t num_not_resp_nodes = 0; @@ -804,6 +822,14 @@ void raft_server::handle_peer_resp(ptr& resp, ptr& err) } else if (rpc_errs == raft_server::raft_limits_.warning_limit_) { p_wn("too verbose RPC error on peer (%d), " "will suppress it from now", peer_id); + if (!pp || !pp->IsLost()) { + if (pp) { + pp->SetLost(); + } + cb_func::Param param(id_, leader_, peer_id); + const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, ¶m); + assert(rc == cb_func::ReturnCode::Ok); + } } if (pp && pp->is_leave_flag_set()) { @@ -837,6 +863,9 @@ void raft_server::handle_peer_resp(ptr& resp, ptr& err) peer* pp = entry->second.get(); int rpc_errs = pp->get_rpc_errs(); if (rpc_errs >= raft_server::raft_limits_.warning_limit_) { + if (pp) { + pp->SetRecovered(); + } p_wn("recovered from RPC failure from peer %d, %d errors", resp->get_src(), rpc_errs); } @@ -1085,7 +1114,7 @@ bool raft_server::check_leadership_validity() { int32 num_voting_members = get_num_voting_members(); int leadership_expiry = get_leadership_expiry(); - int32 nr_peers = (int32)get_not_responding_peers(); + int32 nr_peers = (int32)get_not_responding_peers_count(); if (leadership_expiry < 0) { // Negative expiry: leadership will never expire. nr_peers = 0; @@ -1102,6 +1131,18 @@ bool raft_server::check_leadership_validity() { get_leadership_expiry(), min_quorum_size); + const auto nr_peers_list = get_not_responding_peers(); + assert(nr_peers_list.size() == nr_peers); + for (auto& peer : nr_peers_list) { + if (peer->IsLost()) { + continue; + } + peer->SetLost(); + cb_func::Param param(id_, leader_, peer->get_id()); + const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, ¶m); + assert(rc == cb_func::ReturnCode::Ok); + } + // NOTE: // For a cluster where the number of members is the same // as the size of quorum, we should not expire leadership, From e0400a5227d3da8a4ee176b81daaefdb58089c62 Mon Sep 17 00:00:00 2001 From: Pavel Yurin Date: Wed, 26 Jun 2024 13:06:34 +0800 Subject: [PATCH 2/4] PR review fixes --- include/libnuraft/callback.hxx | 2 +- include/libnuraft/peer.hxx | 6 +-- include/libnuraft/raft_server.hxx | 4 +- src/raft_server.cxx | 72 +++++++++++++++---------------- 4 files changed, 41 insertions(+), 43 deletions(-) diff --git a/include/libnuraft/callback.hxx b/include/libnuraft/callback.hxx index b03777c2..de40f327 100644 --- a/include/libnuraft/callback.hxx +++ b/include/libnuraft/callback.hxx @@ -213,7 +213,7 @@ public: * the peer is considered lost. * ctx: null. */ - FollowerLost = 100, + FollowerLost = 28, }; struct Param { diff --git a/include/libnuraft/peer.hxx b/include/libnuraft/peer.hxx index 27c46f2f..ca20e88d 100644 --- a/include/libnuraft/peer.hxx +++ b/include/libnuraft/peer.hxx @@ -302,9 +302,9 @@ public: ptr get_rsv_msg() const { return rsv_msg_; } rpc_handler get_rsv_msg_handler() const { return rsv_msg_handler_; } - bool IsLost() const { return lost_by_leader_; } - void SetLost() { lost_by_leader_ = true; } - void SetRecovered() { lost_by_leader_ = false; } + bool is_lost() const { return lost_by_leader_; } + void set_lost() { lost_by_leader_ = true; } + void set_recovered() { lost_by_leader_ = false; } private: void handle_rpc_result(ptr myself, diff --git a/include/libnuraft/raft_server.hxx b/include/libnuraft/raft_server.hxx index 957de6f3..fe6ffceb 100644 --- a/include/libnuraft/raft_server.hxx +++ b/include/libnuraft/raft_server.hxx @@ -886,10 +886,12 @@ protected: int32 get_quorum_for_election(); int32 get_quorum_for_commit(); int32 get_leadership_expiry(); - std::vector> get_not_responding_peers(); + std::list> get_not_responding_peers(); size_t get_not_responding_peers_count(); size_t get_num_stale_peers(); + void apply_to_not_responding_peers(const std::function&)>&); + ptr handle_append_entries(req_msg& req); ptr handle_prevote_req(req_msg& req); ptr handle_vote_req(req_msg& req); diff --git a/src/raft_server.cxx b/src/raft_server.cxx index 37de15c3..ca89c24f 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -608,45 +608,43 @@ int32 raft_server::get_leadership_expiry() { return expiry; } -std::vector> raft_server::get_not_responding_peers() { - ptr params = ctx_->get_params(); - const auto expiry = - params->heart_beat_interval_ * raft_server::raft_limits_.response_limit_; - std::vector> rs; - for (const auto& [id, peer_ptr]: peers_) { - if (!is_regular_member(peer_ptr)) { - continue; - } - const auto resp_elapsed_ms = - static_cast(peer_ptr->get_resp_timer_us() / 1000); - if (resp_elapsed_ms > expiry) { - rs.push_back(peer_ptr); - } - } +std::list> raft_server::get_not_responding_peers() { + std::list> rs; + auto cb = [&rs](const ptr& peer_ptr) { + rs.push_back(peer_ptr); + }; + apply_to_not_responding_peers(cb); return rs; } size_t raft_server::get_not_responding_peers_count() { - // Check if quorum nodes are not responding - // (i.e., don't respond 20x heartbeat time long). size_t num_not_resp_nodes = 0; + auto cb = [&num_not_resp_nodes](const ptr&) { + ++num_not_resp_nodes; + }; + apply_to_not_responding_peers(cb); + return num_not_resp_nodes; +} +void raft_server::apply_to_not_responding_peers( + const std::function&)>& callback) { + // Check if quorum nodes are not responding + // (i.e., don't respond 20x heartbeat time long). ptr params = ctx_->get_params(); - int expiry = params->heart_beat_interval_ * - raft_server::raft_limits_.response_limit_; + int expiry = params->heart_beat_interval_ * raft_server::raft_limits_.response_limit_; - // Check the number of not responding peers. + // Check not responding peers. for (auto& entry: peers_) { - ptr p = entry.second; + const auto& peer_ptr = entry.second; - if (!is_regular_member(p)) continue; + if (!is_regular_member(peer_ptr)) continue; - int32 resp_elapsed_ms = (int32)(p->get_resp_timer_us() / 1000); - if ( resp_elapsed_ms > expiry ) { - num_not_resp_nodes++; + const auto resp_elapsed_ms = + static_cast(peer_ptr->get_resp_timer_us() / 1000); + if (resp_elapsed_ms > expiry) { + callback(peer_ptr); } } - return num_not_resp_nodes; } size_t raft_server::get_num_stale_peers() { @@ -822,9 +820,9 @@ void raft_server::handle_peer_resp(ptr& resp, ptr& err) } else if (rpc_errs == raft_server::raft_limits_.warning_limit_) { p_wn("too verbose RPC error on peer (%d), " "will suppress it from now", peer_id); - if (!pp || !pp->IsLost()) { + if (!pp || !pp->is_lost()) { if (pp) { - pp->SetLost(); + pp->set_lost(); } cb_func::Param param(id_, leader_, peer_id); const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, ¶m); @@ -863,9 +861,7 @@ void raft_server::handle_peer_resp(ptr& resp, ptr& err) peer* pp = entry->second.get(); int rpc_errs = pp->get_rpc_errs(); if (rpc_errs >= raft_server::raft_limits_.warning_limit_) { - if (pp) { - pp->SetRecovered(); - } + pp->set_recovered(); p_wn("recovered from RPC failure from peer %d, %d errors", resp->get_src(), rpc_errs); } @@ -1134,13 +1130,13 @@ bool raft_server::check_leadership_validity() { const auto nr_peers_list = get_not_responding_peers(); assert(nr_peers_list.size() == nr_peers); for (auto& peer : nr_peers_list) { - if (peer->IsLost()) { - continue; - } - peer->SetLost(); - cb_func::Param param(id_, leader_, peer->get_id()); - const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, ¶m); - assert(rc == cb_func::ReturnCode::Ok); + if (peer->is_lost()) { + continue; + } + peer->set_lost(); + cb_func::Param param(id_, leader_, peer->get_id()); + const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, ¶m); + assert(rc == cb_func::ReturnCode::Ok); } // NOTE: From b2d0b07be9743e54d9bf0841893e45eb4a7f94aa Mon Sep 17 00:00:00 2001 From: Pavel Yurin Date: Wed, 26 Jun 2024 13:32:07 +0800 Subject: [PATCH 3/4] Fixed a warning "comparison of integer expressions of different signedness" --- src/raft_server.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/raft_server.cxx b/src/raft_server.cxx index ca89c24f..839e3eda 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -1128,7 +1128,7 @@ bool raft_server::check_leadership_validity() { min_quorum_size); const auto nr_peers_list = get_not_responding_peers(); - assert(nr_peers_list.size() == nr_peers); + assert(nr_peers_list.size() == static_cast(nr_peers)); for (auto& peer : nr_peers_list) { if (peer->is_lost()) { continue; From e45952ab5ecdc628633dc3d079746f19ce209097 Mon Sep 17 00:00:00 2001 From: Jung-Sang Ahn Date: Thu, 18 Jul 2024 16:20:24 -0700 Subject: [PATCH 4/4] [Update PR] Add comments and put more set_recovered() calls --- include/libnuraft/peer.hxx | 7 ++++++- src/raft_server.cxx | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/include/libnuraft/peer.hxx b/include/libnuraft/peer.hxx index ca20e88d..62d67739 100644 --- a/include/libnuraft/peer.hxx +++ b/include/libnuraft/peer.hxx @@ -74,6 +74,7 @@ public: , reconn_backoff_(0) , suppress_following_error_(false) , abandoned_(false) + , lost_by_leader_(false) , rsv_msg_(nullptr) , rsv_msg_handler_(nullptr) , l_(logger) @@ -502,7 +503,11 @@ private: */ std::atomic abandoned_; - std::atomic lost_by_leader_ {false}; + /** + * If `true`, this peer is considered unresponsive + * and treated as if it has been lost. + */ + std::atomic lost_by_leader_; /** * Reserved message that should be sent next time. diff --git a/src/raft_server.cxx b/src/raft_server.cxx index 839e3eda..7fd00a37 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -861,10 +861,10 @@ void raft_server::handle_peer_resp(ptr& resp, ptr& err) peer* pp = entry->second.get(); int rpc_errs = pp->get_rpc_errs(); if (rpc_errs >= raft_server::raft_limits_.warning_limit_) { - pp->set_recovered(); p_wn("recovered from RPC failure from peer %d, %d errors", resp->get_src(), rpc_errs); } + pp->set_recovered(); pp->reset_rpc_errs(); pp->reset_resp_timer(); } @@ -1044,6 +1044,7 @@ void raft_server::become_leader() { pp->set_next_log_idx(log_store_->next_slot()); enable_hb_for_peer(*pp); + pp->set_recovered(); } // If there are uncommitted logs, search if conf log exists.