Skip to content

Commit

Permalink
Merge branch 'leader-stepdown-v2' from Denis
Browse files Browse the repository at this point in the history
We want a raft leader to step down when it's disconnected from peers to
avoid availability hit in case it's still connected to the client.
  • Loading branch information
rystsov committed Nov 3, 2020
2 parents db86b23 + 7ff2d7c commit 0e2d5ec
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/rfcs/20200421_raft_recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ In order to make it possible new fields have to be added to
// next index to send to this follower
model::offset next_index;
// timestamp of last append_entries_rpc call
clock_type::time_point last_hbeat_timestamp;
clock_type::time_point last_append_timestamp;
uint64_t failed_appends{0};
bool is_learner = false;
bool is_recovering = false;
Expand Down
37 changes: 35 additions & 2 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ consensus::consensus(
setup_metrics();
update_follower_stats(_configuration_manager.get_latest());
_vote_timeout.set_callback([this] {
maybe_step_down();
dispatch_flush_with_lock();
dispatch_vote(false);
});
Expand Down Expand Up @@ -89,6 +90,31 @@ void consensus::do_step_down() {
_vstate = vote_state::follower;
}

void consensus::maybe_step_down() {
(void)ss::with_gate(_bg, [this] {
return _op_lock.with([this] {
if (_vstate == vote_state::leader) {
auto majority_hbeat = config().quorum_match(
[this](model::node_id id) {
if (id == _self) {
return clock_type::now();
}

return _fstats.get(id).last_hbeat_timestamp;
});

if (majority_hbeat < _became_leader_at) {
majority_hbeat = _became_leader_at;
}

if (majority_hbeat + _jit.base_duration() < clock_type::now()) {
do_step_down();
}
}
});
});
}

ss::future<> consensus::stop() {
vlog(_ctxlog.info, "Stopping");
_vote_timeout.cancel();
Expand Down Expand Up @@ -178,6 +204,8 @@ consensus::success_reply consensus::update_follower_index(
return success_reply::no;
}

update_node_hbeat_timestamp(node);

// If recovery is in progress the recovery STM will handle follower index
// updates
if (!idx.is_recovering) {
Expand Down Expand Up @@ -1377,8 +1405,13 @@ model::term_id consensus::get_term(model::offset o) {
return _log.get_term(o).value_or(model::term_id{});
}

clock_type::time_point consensus::last_hbeat_timestamp(model::node_id id) {
return _fstats.get(id).last_hbeat_timestamp;
clock_type::time_point consensus::last_append_timestamp(model::node_id id) {
return _fstats.get(id).last_append_timestamp;
}

void consensus::update_node_append_timestamp(model::node_id id) {
_fstats.get(id).last_append_timestamp = clock_type::now();
update_node_hbeat_timestamp(id);
}

void consensus::update_node_hbeat_timestamp(model::node_id id) {
Expand Down
7 changes: 6 additions & 1 deletion src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class consensus {
replace_configuration(std::vector<model::broker>);

bool is_leader() const { return _vstate == vote_state::leader; }
bool is_candidate() const { return _vstate == vote_state::candidate; }
std::optional<model::node_id> get_leader_id() const { return _leader_id; }
model::node_id self() const { return _self; }
protocol_metadata meta() const {
Expand All @@ -100,7 +101,7 @@ class consensus {
const model::ntp& ntp() const { return _log.config().ntp(); }
clock_type::time_point last_heartbeat() const { return _hbeat; };

clock_type::time_point last_hbeat_timestamp(model::node_id);
clock_type::time_point last_append_timestamp(model::node_id);
/**
* \brief Persist snapshot with given data and start offset
*
Expand Down Expand Up @@ -264,6 +265,7 @@ class consensus {
ss::future<> maybe_update_follower_commit_idx(model::offset);

void arm_vote_timeout();
void update_node_append_timestamp(model::node_id);
void update_node_hbeat_timestamp(model::node_id);

void update_follower_stats(const group_configuration&);
Expand All @@ -275,6 +277,8 @@ class consensus {
/// the ops semaphore
void dispatch_flush_with_lock();

void maybe_step_down();

absl::flat_hash_map<model::node_id, follower_req_seq>
next_followers_request_seq();

Expand Down Expand Up @@ -312,6 +316,7 @@ class consensus {

/// useful for when we are not the leader
clock_type::time_point _hbeat = clock_type::now();
clock_type::time_point _became_leader_at = clock_type::now();
/// used to keep track if we are a leader, or transitioning
vote_state _vstate = vote_state::follower;
/// used for votes only. heartbeats are done by heartbeat_manager
Expand Down
6 changes: 3 additions & 3 deletions src/v/raft/heartbeat_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ static std::vector<heartbeat_manager::node_heartbeat> requests_for_range(
return;
}

auto last_hbeat_timestamp = ptr->last_hbeat_timestamp(n.id());
auto last_append_timestamp = ptr->last_append_timestamp(n.id());

if (last_hbeat_timestamp > last_heartbeat) {
if (last_append_timestamp > last_heartbeat) {
vlog(
hbeatlog.trace,
"Skipping sending beat to {} gr: {} last hb {}, last append "
"{}",
n.id(),
ptr->group(),
last_heartbeat.time_since_epoch().count(),
last_hbeat_timestamp.time_since_epoch().count());
last_append_timestamp.time_since_epoch().count());
// we already sent heartbeat, skip it
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/recovery_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ ss::future<> recovery_stm::replicate(model::record_batch_reader&& reader) {
std::move(reader),
append_entries_request::flush_after_append::no);

_ptr->update_node_hbeat_timestamp(_node_id);
_ptr->update_node_append_timestamp(_node_id);

auto seq = _ptr->next_follower_sequence(_node_id);
return dispatch_append_entries(std::move(r)).then([this, seq](auto r) {
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/replicate_entries_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ clock_type::time_point replicate_entries_stm::append_entries_timeout() {
ss::future<result<append_entries_reply>>
replicate_entries_stm::send_append_entries_request(
model::node_id n, append_entries_request req) {
_ptr->update_node_hbeat_timestamp(n);
_ptr->update_node_append_timestamp(n);
vlog(_ctxlog.trace, "Sending append entries request {} to {}", req.meta, n);

auto f = _ptr->_client_protocol.append_entries(
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct follower_index_metadata {
// next index to send to this follower
model::offset next_index;
// timestamp of last append_entries_rpc call
clock_type::time_point last_append_timestamp;
clock_type::time_point last_hbeat_timestamp;
uint64_t failed_appends{0};
// The pair of sequences used to track append entries requests sent and
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/vote_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ void vote_stm::update_vote_state(ss::semaphore_units<> u) {
// section vote:5.2.2
_ptr->_vstate = consensus::vote_state::leader;
_ptr->_leader_id = _ptr->self();
_ptr->_became_leader_at = clock_type::now();
// Set last heartbeat timestamp to max as we are the leader
_ptr->_hbeat = clock_type::time_point::max();
vlog(_ctxlog.info, "became the leader term:{}", _ptr->term());
Expand Down

0 comments on commit 0e2d5ec

Please sign in to comment.