Skip to content

Commit

Permalink
etcdserver: add readyz check for the read index
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <[email protected]>
  • Loading branch information
siyuanfoundation committed Oct 17, 2023
1 parent aef295c commit 2461ffd
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 17 deletions.
9 changes: 9 additions & 0 deletions server/etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ServerHealth interface {
Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error)
Config() config.ServerConfig
AuthStore() auth.AuthStore
GetReadIndex(ctx context.Context) (uint64, error)
}

// HandleHealth registers metrics and health handlers. it checks health by using v3 range request
Expand Down Expand Up @@ -215,6 +216,7 @@ func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHea
reg := CheckRegistry{path: "/readyz", checks: make(map[string]HealthCheck)}
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
reg.Register("serializable_read", serializableReadCheck(server))
reg.Register("read_index", readIndexCheck(server))
reg.InstallHttpEndpoints(lg, mux)
}

Expand Down Expand Up @@ -365,3 +367,10 @@ func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
return nil
}
}

func readIndexCheck(srv ServerHealth) func(ctx context.Context) error {
return func(ctx context.Context) error {
_, err := srv.GetReadIndex(ctx)
return err
}
}
52 changes: 46 additions & 6 deletions server/etcdserver/api/etcdhttp/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ import (

type fakeHealthServer struct {
fakeServer
apiError error
missingLeader bool
authStore auth.AuthStore
apiError error
missingLeader bool
authStore auth.AuthStore
readIndexError error
}

func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
Expand All @@ -58,6 +59,10 @@ func (s *fakeHealthServer) Leader() types.ID {
return types.ID(raft.None)
}

func (s *fakeHealthServer) GetReadIndex(ctx context.Context) (uint64, error) {
return 0, s.readIndexError
}

func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore }

func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false }
Expand All @@ -69,9 +74,10 @@ type healthTestCase struct {
inResult []string
notInResult []string

alarms []*pb.AlarmMember
apiError error
missingLeader bool
alarms []*pb.AlarmMember
apiError error
readIndexError error
missingLeader bool
}

func TestHealthHandler(t *testing.T) {
Expand Down Expand Up @@ -295,6 +301,40 @@ func TestSerializableReadCheck(t *testing.T) {
}
}

func TestReadIndexCheck(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
tests := []healthTestCase{
{
name: "Alive ok",
healthCheckURL: "/livez",
readIndexError: fmt.Errorf("failed to get read index"),
expectStatusCode: http.StatusOK,
},
{
name: "Not ready if read index error",
healthCheckURL: "/readyz",
readIndexError: fmt.Errorf("failed to get read index"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]read_index failed: failed to get read index"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
readIndexError: tt.readIndexError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
defer ts.Close()
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
})
}
}

func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})

Expand Down
10 changes: 10 additions & 0 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ type EtcdServer struct {
// when there is no error
readNotifier *notifier

// GetReadIndex notifies etcd server that it waits for the read index by sending an empty struct to
// readIndexWaitc
readIndexWaitc chan struct{}
// readIndexNotifier is used to notify GetReadIndex when it is able to get the read index
// when there is no error
readIndexNotifier *notifier

// stop signals the run goroutine should shutdown.
stop chan struct{}
// stopping is closed by run goroutine on shutdown.
Expand Down Expand Up @@ -528,6 +535,7 @@ func (s *EtcdServer) Start() {
s.GoAttach(s.monitorClusterVersions)
s.GoAttach(s.monitorStorageVersion)
s.GoAttach(s.linearizableReadLoop)
s.GoAttach(s.readIndexLoop)
s.GoAttach(s.monitorKVHash)
s.GoAttach(s.monitorCompactHash)
s.GoAttach(s.monitorDowngrade)
Expand Down Expand Up @@ -564,6 +572,8 @@ func (s *EtcdServer) start() {
s.ctx, s.cancel = context.WithCancel(context.Background())
s.readwaitc = make(chan struct{}, 1)
s.readNotifier = newNotifier()
s.readIndexWaitc = make(chan struct{}, 1)
s.readIndexNotifier = newNotifier()
s.leaderChanged = notify.NewNotifier()
if s.ClusterVersion() != nil {
lg.Info(
Expand Down
3 changes: 3 additions & 0 deletions server/etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool
type notifier struct {
c chan struct{}
err error

// pass some values in the notifier
uint64Val uint64
}

func newNotifier() *notifier {
Expand Down
63 changes: 56 additions & 7 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,16 +780,11 @@ func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

func (s *EtcdServer) linearizableReadLoop() {
for {
requestId := s.reqIDGen.Next()
leaderChangedNotifier := s.leaderChanged.Receive()
select {
case <-leaderChangedNotifier:
continue
case <-s.readwaitc:
case <-s.stopping:
return
}

// as a single loop is can unlock multiple reads, it is not very useful
// to propagate the trace from Txn or Range.
trace := traceutil.New("linearizableReadLoop", s.Logger())
Expand All @@ -799,8 +794,9 @@ func (s *EtcdServer) linearizableReadLoop() {
nr := s.readNotifier
s.readNotifier = nextnr
s.readMu.Unlock()

confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
confirmedIndex, err := s.GetReadIndex(ctx)
cancel()
if isStopped(err) {
return
}
Expand Down Expand Up @@ -831,6 +827,59 @@ func (s *EtcdServer) linearizableReadLoop() {
}
}

func (s *EtcdServer) readIndexLoop() {
for {
requestId := s.reqIDGen.Next()
leaderChangedNotifier := s.leaderChanged.Receive()
select {
case <-leaderChangedNotifier:
continue
case <-s.readIndexWaitc:
case <-s.stopping:
return
}

nextnr := newNotifier()
s.readMu.Lock()
nr := s.readIndexNotifier
s.readIndexNotifier = nextnr
s.readMu.Unlock()

confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
if isStopped(err) {
return
}
if err != nil {
nr.notify(err)
continue
}
nr.uint64Val = confirmedIndex
// unblock all read index requested at indices before confirmedIndex
nr.notify(nil)
}
}

func (s *EtcdServer) GetReadIndex(ctx context.Context) (uint64, error) {
s.readMu.RLock()
nc := s.readIndexNotifier
s.readMu.RUnlock()

// signal linearizable loop for current notify if it hasn't been already
select {
case s.readIndexWaitc <- struct{}{}:
default:
}
// wait for read state notification
select {
case <-nc.c:
return nc.uint64Val, nc.err
case <-ctx.Done():
return 0, ctx.Err()
case <-s.done:
return 0, errors.ErrStopped
}
}

func isStopped(err error) bool {
return err == raft.ErrStopped || err == errors.ErrStopped
}
Expand Down
9 changes: 5 additions & 4 deletions tests/e2e/http_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ func TestHTTPLivezReadyzHandler(t *testing.T) {
expectedStatusCode: http.StatusOK,
},
{
url: "/readyz",
expectedStatusCode: http.StatusOK,
url: "/readyz",
expectedTimeoutError: true,
expectedStatusCode: http.StatusServiceUnavailable,
},
},
},
Expand All @@ -243,8 +244,8 @@ func TestHTTPLivezReadyzHandler(t *testing.T) {
expectedStatusCode: http.StatusOK,
},
{
url: "/readyz",
expectedStatusCode: http.StatusOK,
url: "/readyz",
expectedTimeoutError: true,
},
},
},
Expand Down

0 comments on commit 2461ffd

Please sign in to comment.