diff --git a/server/etcdserver/api/etcdhttp/health.go b/server/etcdserver/api/etcdhttp/health.go index 234580805b40..5be949b67b66 100644 --- a/server/etcdserver/api/etcdhttp/health.go +++ b/server/etcdserver/api/etcdhttp/health.go @@ -44,6 +44,7 @@ type ServerHealth interface { Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error) Config() config.ServerConfig AuthStore() auth.AuthStore + GetReadIndex(ctx context.Context) (uint64, error) } // HandleHealth registers metrics and health handlers. it checks health by using v3 range request @@ -215,6 +216,7 @@ func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHea reg := CheckRegistry{path: "/readyz", checks: make(map[string]HealthCheck)} reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT)) reg.Register("serializable_read", serializableReadCheck(server)) + reg.Register("read_index", readIndexCheck(server)) reg.InstallHttpEndpoints(lg, mux) } @@ -365,3 +367,10 @@ func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error { return nil } } + +func readIndexCheck(srv ServerHealth) func(ctx context.Context) error { + return func(ctx context.Context) error { + _, err := srv.GetReadIndex(ctx) + return err + } +} diff --git a/server/etcdserver/api/etcdhttp/health_test.go b/server/etcdserver/api/etcdhttp/health_test.go index 122fbf6adcf5..be4e1b6cdd0b 100644 --- a/server/etcdserver/api/etcdhttp/health_test.go +++ b/server/etcdserver/api/etcdhttp/health_test.go @@ -38,9 +38,10 @@ import ( type fakeHealthServer struct { fakeServer - apiError error - missingLeader bool - authStore auth.AuthStore + apiError error + missingLeader bool + authStore auth.AuthStore + readIndexError error } func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) { @@ -58,6 +59,10 @@ func (s *fakeHealthServer) Leader() types.ID { return types.ID(raft.None) } +func (s *fakeHealthServer) GetReadIndex(ctx context.Context) (uint64, error) { + return 0, s.readIndexError +} + func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore } func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false } @@ -69,9 +74,10 @@ type healthTestCase struct { inResult []string notInResult []string - alarms []*pb.AlarmMember - apiError error - missingLeader bool + alarms []*pb.AlarmMember + apiError error + readIndexError error + missingLeader bool } func TestHealthHandler(t *testing.T) { @@ -295,6 +301,40 @@ func TestSerializableReadCheck(t *testing.T) { } } +func TestReadIndexCheck(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "Alive ok", + healthCheckURL: "/livez", + readIndexError: fmt.Errorf("failed to get read index"), + expectStatusCode: http.StatusOK, + }, + { + name: "Not ready if read index error", + healthCheckURL: "/readyz", + readIndexError: fmt.Errorf("failed to get read index"), + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]read_index failed: failed to get read index"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + readIndexError: tt.readIndexError, + authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0), + } + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + }) + } +} + func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) { res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)}) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 4b40e32bada2..4f969b82b380 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -229,6 +229,13 @@ type EtcdServer struct { // when there is no error readNotifier *notifier + // GetReadIndex notifies etcd server that it waits for the read index by sending an empty struct to + // readIndexWaitc + readIndexWaitc chan struct{} + // readIndexNotifier is used to notify GetReadIndex when it is able to get the read index + // when there is no error + readIndexNotifier *notifier + // stop signals the run goroutine should shutdown. stop chan struct{} // stopping is closed by run goroutine on shutdown. @@ -528,6 +535,7 @@ func (s *EtcdServer) Start() { s.GoAttach(s.monitorClusterVersions) s.GoAttach(s.monitorStorageVersion) s.GoAttach(s.linearizableReadLoop) + s.GoAttach(s.readIndexLoop) s.GoAttach(s.monitorKVHash) s.GoAttach(s.monitorCompactHash) s.GoAttach(s.monitorDowngrade) @@ -564,6 +572,8 @@ func (s *EtcdServer) start() { s.ctx, s.cancel = context.WithCancel(context.Background()) s.readwaitc = make(chan struct{}, 1) s.readNotifier = newNotifier() + s.readIndexWaitc = make(chan struct{}, 1) + s.readIndexNotifier = newNotifier() s.leaderChanged = notify.NewNotifier() if s.ClusterVersion() != nil { lg.Info( diff --git a/server/etcdserver/util.go b/server/etcdserver/util.go index fbba5491b071..cf303a40a6b7 100644 --- a/server/etcdserver/util.go +++ b/server/etcdserver/util.go @@ -84,6 +84,9 @@ func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool type notifier struct { c chan struct{} err error + + // pass some values in the notifier + uint64Val uint64 } func newNotifier() *notifier { diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 07d1f546c9f1..549e002f91a3 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -780,16 +780,11 @@ func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } func (s *EtcdServer) linearizableReadLoop() { for { - requestId := s.reqIDGen.Next() - leaderChangedNotifier := s.leaderChanged.Receive() select { - case <-leaderChangedNotifier: - continue case <-s.readwaitc: case <-s.stopping: return } - // as a single loop is can unlock multiple reads, it is not very useful // to propagate the trace from Txn or Range. trace := traceutil.New("linearizableReadLoop", s.Logger()) @@ -799,8 +794,9 @@ func (s *EtcdServer) linearizableReadLoop() { nr := s.readNotifier s.readNotifier = nextnr s.readMu.Unlock() - - confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId) + ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) + confirmedIndex, err := s.GetReadIndex(ctx) + cancel() if isStopped(err) { return } @@ -831,6 +827,59 @@ func (s *EtcdServer) linearizableReadLoop() { } } +func (s *EtcdServer) readIndexLoop() { + for { + requestId := s.reqIDGen.Next() + leaderChangedNotifier := s.leaderChanged.Receive() + select { + case <-leaderChangedNotifier: + continue + case <-s.readIndexWaitc: + case <-s.stopping: + return + } + + nextnr := newNotifier() + s.readMu.Lock() + nr := s.readIndexNotifier + s.readIndexNotifier = nextnr + s.readMu.Unlock() + + confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId) + if isStopped(err) { + return + } + if err != nil { + nr.notify(err) + continue + } + nr.uint64Val = confirmedIndex + // unblock all read index requested at indices before confirmedIndex + nr.notify(nil) + } +} + +func (s *EtcdServer) GetReadIndex(ctx context.Context) (uint64, error) { + s.readMu.RLock() + nc := s.readIndexNotifier + s.readMu.RUnlock() + + // signal linearizable loop for current notify if it hasn't been already + select { + case s.readIndexWaitc <- struct{}{}: + default: + } + // wait for read state notification + select { + case <-nc.c: + return nc.uint64Val, nc.err + case <-ctx.Done(): + return 0, ctx.Err() + case <-s.done: + return 0, errors.ErrStopped + } +} + func isStopped(err error) bool { return err == raft.ErrStopped || err == errors.ErrStopped } diff --git a/tests/e2e/http_health_check_test.go b/tests/e2e/http_health_check_test.go index 0ee7e51c08a8..97f4648701bd 100644 --- a/tests/e2e/http_health_check_test.go +++ b/tests/e2e/http_health_check_test.go @@ -225,8 +225,9 @@ func TestHTTPLivezReadyzHandler(t *testing.T) { expectedStatusCode: http.StatusOK, }, { - url: "/readyz", - expectedStatusCode: http.StatusOK, + url: "/readyz", + expectedTimeoutError: true, + expectedStatusCode: http.StatusServiceUnavailable, }, }, }, @@ -243,8 +244,8 @@ func TestHTTPLivezReadyzHandler(t *testing.T) { expectedStatusCode: http.StatusOK, }, { - url: "/readyz", - expectedStatusCode: http.StatusOK, + url: "/readyz", + expectedTimeoutError: true, }, }, },