Skip to content

Commit

Permalink
Merge pull request hashicorp#494 from hashicorp/hridoyroy/heartbeat-b…
Browse files Browse the repository at this point in the history
…ackoff

Cap maximum grpc wait time when heartbeating to heartbeatTimeout/2
  • Loading branch information
HridoyRoy authored May 9, 2022
2 parents 5157c19 + 81a7d27 commit 9174562
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 2 deletions.
59 changes: 59 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2644,6 +2644,65 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) {
}
}

// TestRaft_FollowerRemovalNoElection ensures that a leader election is not
// started when a standby is shut down and restarted.
func TestRaft_FollowerRemovalNoElection(t *testing.T) {
// Make a cluster
inmemConf := inmemConfig(t)
inmemConf.HeartbeatTimeout = 100 * time.Millisecond
inmemConf.ElectionTimeout = 100 * time.Millisecond
c := MakeCluster(3, t, inmemConf)

defer c.Close()
waitForLeader(c)

leader := c.Leader()

// Wait until we have 2 followers
limit := time.Now().Add(c.longstopTimeout)
var followers []*Raft
for time.Now().Before(limit) && len(followers) != 2 {
c.WaitEvent(nil, c.conf.CommitTimeout)
followers = c.GetInState(Follower)
}
if len(followers) != 2 {
t.Fatalf("expected two followers: %v", followers)
}

// Disconnect one of the followers and wait for the heartbeat timeout
i := 0
follower := c.rafts[i]
if follower == c.Leader() {
i = 1
follower = c.rafts[i]
}
logs := follower.logs
t.Logf("[INFO] restarting %v", follower)
// Shutdown follower
if f := follower.Shutdown(); f.Error() != nil {
t.Fatalf("error shuting down follower: %v", f.Error())
}

_, trans := NewInmemTransport(follower.localAddr)
conf := follower.config()
n, err := NewRaft(&conf, &MockFSM{}, logs, follower.stable, follower.snapshots, trans)
if err != nil {
t.Fatalf("error restarting follower: %v", err)
}
c.rafts[i] = n
c.trans[i] = n.trans.(*InmemTransport)
c.fsms[i] = n.fsm.(*MockFSM)
c.FullyConnect()
// There should be no re-election during this sleep
time.Sleep(250 * time.Millisecond)

// Let things settle and make sure we recovered.
c.EnsureLeader(t, leader.localAddr)
c.EnsureSame(t)
c.EnsureSamePeers(t)
n.Shutdown()
}

func TestRaft_VoteWithNoIDNoAddr(t *testing.T) {
// Make a cluster
c := MakeCluster(3, t, nil)
Expand Down
7 changes: 5 additions & 2 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,15 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {

start := time.Now()
if err := r.trans.AppendEntries(peer.ID, peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to heartbeat to", "peer", peer.Address, "error", err)
nextBackoffTime := cappedExponentialBackoff(failureWait, failures, maxFailureScale, r.config().HeartbeatTimeout/2)
r.logger.Error("failed to heartbeat to", "peer", peer.Address, "backoff time",
nextBackoffTime, "error", err)
r.observe(FailedHeartbeatObservation{PeerID: peer.ID, LastContact: s.LastContact()})
failures++
select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
case <-time.After(nextBackoffTime):
case <-stopCh:
return
}
} else {
if failures > 0 {
Expand Down
17 changes: 17 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ func backoff(base time.Duration, round, limit uint64) time.Duration {
return base
}

// cappedExponentialBackoff computes the exponential backoff with an adjustable
// cap on the max timeout.
func cappedExponentialBackoff(base time.Duration, round, limit uint64, cap time.Duration) time.Duration {
power := min(round, limit)
for power > 2 {
if base > cap {
return cap
}
base *= 2
power--
}
if base > cap {
return cap
}
return base
}

// Needed for sorting []uint64, used to determine commitment
type uint64Slice []uint64

Expand Down

0 comments on commit 9174562

Please sign in to comment.