diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 9322aa6c508..5b03322c2be 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -79,7 +79,9 @@ type apply struct { type raftNode struct { lg *zap.Logger - tickMu *sync.Mutex + tickMu *sync.RWMutex + // timestamp of the latest tick + latestTickTs time.Time raftNodeConfig // a chan to send/receive snapshot @@ -131,8 +133,9 @@ func newRaftNode(cfg raftNodeConfig) *raftNode { raft.SetLogger(lg) r := &raftNode{ lg: cfg.lg, - tickMu: new(sync.Mutex), + tickMu: new(sync.RWMutex), raftNodeConfig: cfg, + latestTickTs: time.Now(), // set up contention detectors for raft heartbeat message. // expect to send a heartbeat within 2 heartbeat intervals. td: contention.NewTimeoutDetector(2 * cfg.heartbeat), @@ -154,9 +157,16 @@ func newRaftNode(cfg raftNodeConfig) *raftNode { func (r *raftNode) tick() { r.tickMu.Lock() r.Tick() + r.latestTickTs = time.Now() r.tickMu.Unlock() } +func (r *raftNode) getLatestTickTs() time.Time { + r.tickMu.RLock() + defer r.tickMu.RUnlock() + return r.latestTickTs +} + // start prepares and starts raftNode in a new goroutine. It is no longer safe // to modify the fields after it has been started. func (r *raftNode) start(rh *raftReadyHandler) { diff --git a/etcdserver/server.go b/etcdserver/server.go index 55ce02b03a0..9264dea4fcf 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1174,10 +1174,32 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { }) } +// isActive checks if the etcd instance is still actively processing the +// heartbeat message (ticks). It returns false if no heartbeat has been +// received within 3 * tickMs. +func (s *EtcdServer) isActive() bool { + latestTickTs := s.r.getLatestTickTs() + threshold := 3 * time.Duration(s.Cfg.TickMs) * time.Millisecond + return latestTickTs.Add(threshold).After(time.Now()) +} + // ensureLeadership checks whether current member is still the leader. func (s *EtcdServer) ensureLeadership() bool { lg := s.Logger() + if s.isActive() { + if lg != nil { + lg.Debug("The member is active, skip checking leadership", + zap.Time("latestTickTs", s.r.getLatestTickTs()), + zap.Time("now", time.Now())) + } else { + plog.Debugf("The member is active, skip checking leadership, latestTickTs: %s, now: %s", + s.r.getLatestTickTs(), time.Now()) + } + + return true + } + ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) defer cancel() if err := s.linearizableReadNotify(ctx); err != nil { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 7210d153408..328a67a551d 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -30,6 +30,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/rafthttp" "go.etcd.io/etcd/etcdserver/api/snap" @@ -1995,3 +1996,45 @@ func TestWaitAppliedIndex(t *testing.T) { }) } } + +func TestIsActive(t *testing.T) { + cases := []struct { + name string + tickMs uint + durationSinceLastTick time.Duration + expectActive bool + }{ + { + name: "1.5*tickMs,active", + tickMs: 100, + durationSinceLastTick: 150 * time.Millisecond, + expectActive: true, + }, + { + name: "2*tickMs,active", + tickMs: 200, + durationSinceLastTick: 400 * time.Millisecond, + expectActive: true, + }, + { + name: "4*tickMs,not active", + tickMs: 150, + durationSinceLastTick: 600 * time.Millisecond, + expectActive: false, + }, + } + + for _, tc := range cases { + s := EtcdServer{ + Cfg: ServerConfig{ + TickMs: tc.tickMs, + }, + r: raftNode{ + tickMu: new(sync.RWMutex), + latestTickTs: time.Now().Add(-tc.durationSinceLastTick), + }, + } + + require.Equal(t, tc.expectActive, s.isActive()) + } +}