From d126728781dc6ee1d41e500e9cacd4b65df638c9 Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Mon, 9 Oct 2023 08:11:05 -0700 Subject: [PATCH] raft loop prober with counter Signed-off-by: Chao Chen --- server/etcdserver/api/etcdhttp/health.go | 31 ++++++++++++++++++- server/etcdserver/api/etcdhttp/health_test.go | 2 ++ server/etcdserver/raft.go | 13 ++++++-- server/etcdserver/server.go | 9 ++++-- 4 files changed, 50 insertions(+), 5 deletions(-) diff --git a/server/etcdserver/api/etcdhttp/health.go b/server/etcdserver/api/etcdhttp/health.go index 234580805b4..44b0aa5d79f 100644 --- a/server/etcdserver/api/etcdhttp/health.go +++ b/server/etcdserver/api/etcdhttp/health.go @@ -22,9 +22,12 @@ import ( "net/http" "path" "strings" + "sync" + "time" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "golang.org/x/time/rate" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/types" @@ -36,6 +39,8 @@ import ( const ( PathHealth = "/health" PathProxyHealth = "/proxy/health" + + LivezRaftLoopDeadLockCheckInterval = 5 * time.Second ) type ServerHealth interface { @@ -44,6 +49,7 @@ type ServerHealth interface { Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error) Config() config.ServerConfig AuthStore() auth.AuthStore + TickElapsed() uint64 } // HandleHealth registers metrics and health handlers. it checks health by using v3 range request @@ -203,11 +209,19 @@ type HealthCheck func(ctx context.Context) error type CheckRegistry struct { path string checks map[string]HealthCheck + + // tickElapsed and rate limiter is only used in livez check registry. + mu sync.Mutex + lastRaftTickElapsed uint64 + + rateLimiter *rate.Limiter } func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { - reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck)} + rl := rate.NewLimiter(rate.Every(LivezRaftLoopDeadLockCheckInterval), 1) + reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck), rateLimiter: rl} reg.Register("serializable_read", serializableReadCheck(server)) + reg.Register("raft_loop_progress", raftLoopDeadLockCheck(server, ®)) reg.InstallHttpEndpoints(lg, mux) } @@ -365,3 +379,18 @@ func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error { return nil } } + +func raftLoopDeadLockCheck(srv ServerHealth, reg *CheckRegistry) func(ctx context.Context) error { + return func(ctx context.Context) error { + if reg.rateLimiter.Allow() { + tickElapsed := srv.TickElapsed() + reg.mu.Lock() + defer reg.mu.Unlock() + if tickElapsed <= reg.lastRaftTickElapsed { + return fmt.Errorf("raft loop dead lock") + } + reg.lastRaftTickElapsed = tickElapsed + } + return nil + } +} diff --git a/server/etcdserver/api/etcdhttp/health_test.go b/server/etcdserver/api/etcdhttp/health_test.go index 122fbf6adcf..9882634270e 100644 --- a/server/etcdserver/api/etcdhttp/health_test.go +++ b/server/etcdserver/api/etcdhttp/health_test.go @@ -60,6 +60,8 @@ func (s *fakeHealthServer) Leader() types.ID { func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore } +func (s *fakeHealthServer) TickElapsed() uint64 { return 1 } + func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false } type healthTestCase struct { diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 2a315ea5865..cd447b59f5d 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -23,12 +23,13 @@ import ( "go.uber.org/zap" + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" + "go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/pkg/v3/contention" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" serverstorage "go.etcd.io/etcd/server/v3/storage" - "go.etcd.io/raft/v3" - "go.etcd.io/raft/v3/raftpb" ) const ( @@ -82,6 +83,7 @@ type raftNode struct { tickMu *sync.Mutex raftNodeConfig + tickElapsed uint64 // a chan to send/receive snapshot msgSnapC chan raftpb.Message @@ -155,9 +157,16 @@ func newRaftNode(cfg raftNodeConfig) *raftNode { func (r *raftNode) tick() { r.tickMu.Lock() r.Tick() + r.tickElapsed++ r.tickMu.Unlock() } +func (r *raftNode) safeReadTickElapsed() uint64 { + r.tickMu.Lock() + defer r.tickMu.Unlock() + return r.tickElapsed +} + // start prepares and starts raftNode in a new goroutine. It is no longer safe // to modify the fields after it has been started. func (r *raftNode) start(rh *raftReadyHandler) { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 1d48fa6732c..b64ede213a6 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -33,6 +33,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/api/v3/version" @@ -67,8 +70,6 @@ import ( "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/schema" - "go.etcd.io/raft/v3" - "go.etcd.io/raft/v3/raftpb" ) const ( @@ -1643,6 +1644,10 @@ func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() } func (s *EtcdServer) Term() uint64 { return s.getTerm() } +// TickElapsed returns the raft tick elapsed counter. +// It is used to check if etcdserver raft loop is deadlocked. +func (s *EtcdServer) TickElapsed() uint64 { return s.r.safeReadTickElapsed() } + type confChangeResponse struct { membs []*membership.Member raftAdvanceC <-chan struct{}