diff --git a/include/libnuraft/callback.hxx b/include/libnuraft/callback.hxx index d75b255b..de40f327 100644 --- a/include/libnuraft/callback.hxx +++ b/include/libnuraft/callback.hxx @@ -207,6 +207,13 @@ public: */ ResignationFromLeader = 27, + /** + * When a peer RPC errors count exceeds raft_server::limits.warning_limit_, or + * a peer doesn't respond for a long time (raft_params::leadership_expiry_), + * the peer is considered lost. + * ctx: null. + */ + FollowerLost = 28, }; struct Param { diff --git a/include/libnuraft/peer.hxx b/include/libnuraft/peer.hxx index 895d313a..62d67739 100644 --- a/include/libnuraft/peer.hxx +++ b/include/libnuraft/peer.hxx @@ -74,6 +74,7 @@ public: , reconn_backoff_(0) , suppress_following_error_(false) , abandoned_(false) + , lost_by_leader_(false) , rsv_msg_(nullptr) , rsv_msg_handler_(nullptr) , l_(logger) @@ -302,6 +303,10 @@ public: ptr get_rsv_msg() const { return rsv_msg_; } rpc_handler get_rsv_msg_handler() const { return rsv_msg_handler_; } + bool is_lost() const { return lost_by_leader_; } + void set_lost() { lost_by_leader_ = true; } + void set_recovered() { lost_by_leader_ = false; } + private: void handle_rpc_result(ptr myself, ptr my_rpc_client, @@ -498,6 +503,12 @@ private: */ std::atomic abandoned_; + /** + * If `true`, this peer is considered unresponsive + * and treated as if it has been lost. + */ + std::atomic lost_by_leader_; + /** * Reserved message that should be sent next time. */ diff --git a/include/libnuraft/raft_server.hxx b/include/libnuraft/raft_server.hxx index 70faf49d..fe6ffceb 100644 --- a/include/libnuraft/raft_server.hxx +++ b/include/libnuraft/raft_server.hxx @@ -886,9 +886,12 @@ protected: int32 get_quorum_for_election(); int32 get_quorum_for_commit(); int32 get_leadership_expiry(); - size_t get_not_responding_peers(); + std::list> get_not_responding_peers(); + size_t get_not_responding_peers_count(); size_t get_num_stale_peers(); + void apply_to_not_responding_peers(const std::function&)>&); + ptr handle_append_entries(req_msg& req); ptr handle_prevote_req(req_msg& req); ptr handle_vote_req(req_msg& req); diff --git a/src/handle_append_entries.cxx b/src/handle_append_entries.cxx index 0318487b..00461e00 100644 --- a/src/handle_append_entries.cxx +++ b/src/handle_append_entries.cxx @@ -162,7 +162,7 @@ bool raft_server::request_append_entries(ptr p) { chk_timer.timeout_and_reset() ) { // If auto adjust mode is on for 2-node cluster, and // the follower is not responding, adjust the quorum. - size_t num_not_responding_peers = get_not_responding_peers(); + size_t num_not_responding_peers = get_not_responding_peers_count(); size_t cur_quorum_size = get_quorum_for_commit(); size_t num_stale_peers = get_num_stale_peers(); if (cur_quorum_size >= 1) { @@ -1191,7 +1191,7 @@ ulong raft_server::get_expected_committed_log_idx() { size_t quorum_idx = get_quorum_for_commit(); if (ctx_->get_params()->use_full_consensus_among_healthy_members_) { - size_t not_responding_peers = get_not_responding_peers(); + size_t not_responding_peers = get_not_responding_peers_count(); if (not_responding_peers < voting_members - quorum_idx) { // If full consensus option is on, commit should be // agreed by all healthy members, and the number of diff --git a/src/raft_server.cxx b/src/raft_server.cxx index e331a319..7fd00a37 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -608,27 +608,43 @@ int32 raft_server::get_leadership_expiry() { return expiry; } -size_t raft_server::get_not_responding_peers() { - // Check if quorum nodes are not responding - // (i.e., don't respond 20x heartbeat time long). +std::list> raft_server::get_not_responding_peers() { + std::list> rs; + auto cb = [&rs](const ptr& peer_ptr) { + rs.push_back(peer_ptr); + }; + apply_to_not_responding_peers(cb); + return rs; +} + +size_t raft_server::get_not_responding_peers_count() { size_t num_not_resp_nodes = 0; + auto cb = [&num_not_resp_nodes](const ptr&) { + ++num_not_resp_nodes; + }; + apply_to_not_responding_peers(cb); + return num_not_resp_nodes; +} +void raft_server::apply_to_not_responding_peers( + const std::function&)>& callback) { + // Check if quorum nodes are not responding + // (i.e., don't respond 20x heartbeat time long). ptr params = ctx_->get_params(); - int expiry = params->heart_beat_interval_ * - raft_server::raft_limits_.response_limit_; + int expiry = params->heart_beat_interval_ * raft_server::raft_limits_.response_limit_; - // Check the number of not responding peers. + // Check not responding peers. for (auto& entry: peers_) { - ptr p = entry.second; + const auto& peer_ptr = entry.second; - if (!is_regular_member(p)) continue; + if (!is_regular_member(peer_ptr)) continue; - int32 resp_elapsed_ms = (int32)(p->get_resp_timer_us() / 1000); - if ( resp_elapsed_ms > expiry ) { - num_not_resp_nodes++; + const auto resp_elapsed_ms = + static_cast(peer_ptr->get_resp_timer_us() / 1000); + if (resp_elapsed_ms > expiry) { + callback(peer_ptr); } } - return num_not_resp_nodes; } size_t raft_server::get_num_stale_peers() { @@ -804,6 +820,14 @@ void raft_server::handle_peer_resp(ptr& resp, ptr& err) } else if (rpc_errs == raft_server::raft_limits_.warning_limit_) { p_wn("too verbose RPC error on peer (%d), " "will suppress it from now", peer_id); + if (!pp || !pp->is_lost()) { + if (pp) { + pp->set_lost(); + } + cb_func::Param param(id_, leader_, peer_id); + const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, ¶m); + assert(rc == cb_func::ReturnCode::Ok); + } } if (pp && pp->is_leave_flag_set()) { @@ -840,6 +864,7 @@ void raft_server::handle_peer_resp(ptr& resp, ptr& err) p_wn("recovered from RPC failure from peer %d, %d errors", resp->get_src(), rpc_errs); } + pp->set_recovered(); pp->reset_rpc_errs(); pp->reset_resp_timer(); } @@ -1019,6 +1044,7 @@ void raft_server::become_leader() { pp->set_next_log_idx(log_store_->next_slot()); enable_hb_for_peer(*pp); + pp->set_recovered(); } // If there are uncommitted logs, search if conf log exists. @@ -1085,7 +1111,7 @@ bool raft_server::check_leadership_validity() { int32 num_voting_members = get_num_voting_members(); int leadership_expiry = get_leadership_expiry(); - int32 nr_peers = (int32)get_not_responding_peers(); + int32 nr_peers = (int32)get_not_responding_peers_count(); if (leadership_expiry < 0) { // Negative expiry: leadership will never expire. nr_peers = 0; @@ -1102,6 +1128,18 @@ bool raft_server::check_leadership_validity() { get_leadership_expiry(), min_quorum_size); + const auto nr_peers_list = get_not_responding_peers(); + assert(nr_peers_list.size() == static_cast(nr_peers)); + for (auto& peer : nr_peers_list) { + if (peer->is_lost()) { + continue; + } + peer->set_lost(); + cb_func::Param param(id_, leader_, peer->get_id()); + const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, ¶m); + assert(rc == cb_func::ReturnCode::Ok); + } + // NOTE: // For a cluster where the number of members is the same // as the size of quorum, we should not expire leadership,