diff --git a/server/etcdserver/api/etcdhttp/health.go b/server/etcdserver/api/etcdhttp/health.go index 47191c642d1..234580805b4 100644 --- a/server/etcdserver/api/etcdhttp/health.go +++ b/server/etcdserver/api/etcdhttp/health.go @@ -15,10 +15,13 @@ package etcdhttp import ( + "bytes" "context" "encoding/json" "fmt" "net/http" + "path" + "strings" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -46,7 +49,7 @@ type ServerHealth interface { // HandleHealth registers metrics and health handlers. it checks health by using v3 range request // and its corresponding timeout. func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) { - mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms AlarmSet, serializable bool) Health { + mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health { if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" { return h } @@ -55,10 +58,13 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) { } return checkAPI(ctx, lg, srv, serializable) })) + + installLivezEndpoints(lg, mux, srv) + installReadyzEndpoints(lg, mux, srv) } // NewHealthHandler handles '/health' requests. -func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAlarms AlarmSet, Serializable bool) Health) http.HandlerFunc { +func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAlarms StringSet, Serializable bool) Health) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) @@ -66,7 +72,7 @@ func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAl lg.Warn("/health error", zap.Int("status-code", http.StatusMethodNotAllowed)) return } - excludedAlarms := getExcludedAlarms(r) + excludedAlarms := getQuerySet(r, "exclude") // Passing the query parameter "serializable=true" ensures that the // health of the local etcd is checked vs the health of the cluster. // This is useful for probes attempting to validate the liveness of @@ -119,20 +125,18 @@ type Health struct { Reason string `json:"reason"` } -type AlarmSet map[string]struct{} - -func getExcludedAlarms(r *http.Request) (alarms AlarmSet) { - alarms = make(map[string]struct{}, 2) - alms, found := r.URL.Query()["exclude"] +func getQuerySet(r *http.Request, query string) StringSet { + querySet := make(map[string]struct{}) + qs, found := r.URL.Query()[query] if found { - for _, alm := range alms { - if len(alm) == 0 { + for _, q := range qs { + if len(q) == 0 { continue } - alarms[alm] = struct{}{} + querySet[q] = struct{}{} } } - return alarms + return querySet } func getSerializableFlag(r *http.Request) bool { @@ -141,7 +145,7 @@ func getSerializableFlag(r *http.Request) bool { // TODO: etcdserver.ErrNoLeader in health API -func checkAlarms(lg *zap.Logger, srv ServerHealth, excludedAlarms AlarmSet) Health { +func checkAlarms(lg *zap.Logger, srv ServerHealth, excludedAlarms StringSet) Health { h := Health{Health: "true"} for _, v := range srv.Alarms() { @@ -193,3 +197,171 @@ func checkAPI(ctx context.Context, lg *zap.Logger, srv ServerHealth, serializabl lg.Debug("serving /health true") return h } + +type HealthCheck func(ctx context.Context) error + +type CheckRegistry struct { + path string + checks map[string]HealthCheck +} + +func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { + reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck)} + reg.Register("serializable_read", serializableReadCheck(server)) + reg.InstallHttpEndpoints(lg, mux) +} + +func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { + reg := CheckRegistry{path: "/readyz", checks: make(map[string]HealthCheck)} + reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT)) + reg.Register("serializable_read", serializableReadCheck(server)) + reg.InstallHttpEndpoints(lg, mux) +} + +func (reg *CheckRegistry) Register(name string, check HealthCheck) { + reg.checks[name] = check +} + +func (reg *CheckRegistry) InstallHttpEndpoints(lg *zap.Logger, mux *http.ServeMux) { + checkNames := make([]string, 0, len(reg.checks)) + for k := range reg.checks { + checkNames = append(checkNames, k) + } + + // installs the http handler for the root path. + reg.installRootHttpEndpoint(lg, mux, reg.path, checkNames...) + for _, checkName := range checkNames { + // installs the http handler for the individual check sub path. + subpath := path.Join(reg.path, checkName) + check := checkName + mux.Handle(subpath, newHealthHandler(subpath, lg, func(r *http.Request) Health { + return reg.runHealthChecks(r.Context(), check) + })) + } +} + +func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...string) Health { + h := Health{Health: "true"} + var individualCheckOutput bytes.Buffer + for _, checkName := range checkNames { + check, found := reg.checks[checkName] + if !found { + panic(fmt.Errorf("Health check: %s not registered", checkName)) + } + if err := check(ctx); err != nil { + fmt.Fprintf(&individualCheckOutput, "[-]%s failed: %v\n", checkName, err) + h.Health = "false" + } else { + fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", checkName) + } + } + h.Reason = individualCheckOutput.String() + return h +} + +// installRootHttpEndpoint installs the http handler for the root path. +func (reg *CheckRegistry) installRootHttpEndpoint(lg *zap.Logger, mux *http.ServeMux, path string, checks ...string) { + hfunc := func(r *http.Request) Health { + // extracts the health check names to be excludeList from the query param + excluded := getQuerySet(r, "exclude") + + filteredCheckNames := filterCheckList(lg, listToStringSet(checks), excluded) + return reg.runHealthChecks(r.Context(), filteredCheckNames...) + } + mux.Handle(path, newHealthHandler(path, lg, hfunc)) +} + +// newHealthHandler generates a http HandlerFunc for a health check function hfunc. +func newHealthHandler(path string, lg *zap.Logger, hfunc func(*http.Request) Health) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("Allow", http.MethodGet) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + lg.Warn("Health request error", zap.String("path", path), zap.Int("status-code", http.StatusMethodNotAllowed)) + return + } + h := hfunc(r) + // Always returns detailed reason for failed checks. + if h.Health != "true" { + http.Error(w, h.Reason, http.StatusServiceUnavailable) + lg.Error("Health check error", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusServiceUnavailable)) + return + } + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.Header().Set("X-Content-Type-Options", "nosniff") + // Only writes detailed reason for verbose requests. + if _, found := r.URL.Query()["verbose"]; found { + fmt.Fprint(w, h.Reason) + } + fmt.Fprint(w, "ok\n") + lg.Debug("Health check OK", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusOK)) + } +} + +func filterCheckList(lg *zap.Logger, checks StringSet, excluded StringSet) []string { + filteredList := []string{} + for chk := range checks { + if _, found := excluded[chk]; found { + delete(excluded, chk) + continue + } + filteredList = append(filteredList, chk) + } + if len(excluded) > 0 { + // For version compatibility, excluding non-exist checks would not fail the request. + lg.Warn("some health checks cannot be excluded", zap.String("missing-health-checks", formatQuoted(excluded.List()...))) + } + return filteredList +} + +// formatQuoted returns a formatted string of the health check names, +// preserving the order passed in. +func formatQuoted(names ...string) string { + quoted := make([]string, 0, len(names)) + for _, name := range names { + quoted = append(quoted, fmt.Sprintf("%q", name)) + } + return strings.Join(quoted, ",") +} + +type StringSet map[string]struct{} + +func (s StringSet) List() []string { + keys := make([]string, 0, len(s)) + for k := range s { + keys = append(keys, k) + } + return keys +} + +func listToStringSet(list []string) StringSet { + set := make(map[string]struct{}) + for _, s := range list { + set[s] = struct{}{} + } + return set +} + +// activeAlarmCheck checks if a specific alarm type is active in the server. +func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) error { + return func(ctx context.Context) error { + as := srv.Alarms() + for _, v := range as { + if v.Alarm == at { + return fmt.Errorf("alarm activated: %s", at.String()) + } + } + return nil + } +} + +func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error { + return func(ctx context.Context) error { + ctx = srv.AuthStore().WithRoot(ctx) + _, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true}) + if err != nil { + return fmt.Errorf("range error: %w", err) + } + return nil + } +} diff --git a/server/etcdserver/api/etcdhttp/health_test.go b/server/etcdserver/api/etcdhttp/health_test.go index 1955ffef380..122fbf6adcf 100644 --- a/server/etcdserver/api/etcdhttp/health_test.go +++ b/server/etcdserver/api/etcdhttp/health_test.go @@ -16,11 +16,11 @@ package etcdhttp import ( "context" - "encoding/json" "fmt" "io" "net/http" "net/http/httptest" + "strings" "testing" "go.uber.org/zap/zaptest" @@ -32,16 +32,15 @@ import ( "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/config" - "go.etcd.io/etcd/server/v3/etcdserver" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" "go.etcd.io/etcd/server/v3/storage/schema" ) type fakeHealthServer struct { fakeServer - health string - apiError error - authStore auth.AuthStore + apiError error + missingLeader bool + authStore auth.AuthStore } func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) { @@ -53,87 +52,91 @@ func (s *fakeHealthServer) Config() config.ServerConfig { } func (s *fakeHealthServer) Leader() types.ID { - if s.health == "true" { + if !s.missingLeader { return 1 } return types.ID(raft.None) } -func (s *fakeHealthServer) Do(_ context.Context, _ pb.Request) (etcdserver.Response, error) { - if s.health == "true" { - return etcdserver.Response{}, nil - } - return etcdserver.Response{}, fmt.Errorf("fail health check") -} -func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore } + +func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore } + func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false } +type healthTestCase struct { + name string + healthCheckURL string + expectStatusCode int + inResult []string + notInResult []string + + alarms []*pb.AlarmMember + apiError error + missingLeader bool +} + func TestHealthHandler(t *testing.T) { // define the input and expected output // input: alarms, and healthCheckURL - tests := []struct { - name string - alarms []*pb.AlarmMember - healthCheckURL string - apiError error - - expectStatusCode int - expectHealth string - }{ + tests := []healthTestCase{ { name: "Healthy if no alarm", alarms: []*pb.AlarmMember{}, healthCheckURL: "/health", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Unhealthy if NOSPACE alarm is on", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, healthCheckURL: "/health", expectStatusCode: http.StatusServiceUnavailable, - expectHealth: "false", }, { name: "Healthy if NOSPACE alarm is on and excluded", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Healthy if NOSPACE alarm is excluded", alarms: []*pb.AlarmMember{}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Healthy if multiple NOSPACE alarms are on and excluded", alarms: []*pb.AlarmMember{{MemberID: uint64(1), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(2), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(3), Alarm: pb.AlarmType_NOSPACE}}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Unhealthy if NOSPACE alarms is excluded and CORRUPT is on", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusServiceUnavailable, - expectHealth: "false", }, { name: "Unhealthy if both NOSPACE and CORRUPT are on and excluded", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}}, healthCheckURL: "/health?exclude=NOSPACE&exclude=CORRUPT", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Unhealthy if api is not available", healthCheckURL: "/health", apiError: fmt.Errorf("Unexpected error"), expectStatusCode: http.StatusServiceUnavailable, - expectHealth: "false", + }, + { + name: "Unhealthy if no leader", + healthCheckURL: "/health", + expectStatusCode: http.StatusServiceUnavailable, + missingLeader: true, + }, + { + name: "Healthy if no leader and serializable=true", + healthCheckURL: "/health?serializable=true", + expectStatusCode: http.StatusOK, + missingLeader: true, }, } @@ -144,44 +147,179 @@ func TestHealthHandler(t *testing.T) { be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be) HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{ - fakeServer: fakeServer{alarms: tt.alarms}, - health: tt.expectHealth, - apiError: tt.apiError, - authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0), + fakeServer: fakeServer{alarms: tt.alarms}, + apiError: tt.apiError, + missingLeader: tt.missingLeader, + authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0), }) ts := httptest.NewServer(mux) defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, nil, nil) + }) + } +} - res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+tt.healthCheckURL)}) - if err != nil { - t.Errorf("fail serve http request %s %v", tt.healthCheckURL, err) - } - if res == nil { - t.Errorf("got nil http response with http request %s", tt.healthCheckURL) - return - } - if res.StatusCode != tt.expectStatusCode { - t.Errorf("want statusCode %d but got %d", tt.expectStatusCode, res.StatusCode) +func TestHttpSubPath(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "/readyz/data_corruption ok", + healthCheckURL: "/readyz/data_corruption", + expectStatusCode: http.StatusOK, + }, + { + name: "/readyz/serializable_read not ok with error", + apiError: fmt.Errorf("Unexpected error"), + healthCheckURL: "/readyz/serializable_read", + expectStatusCode: http.StatusServiceUnavailable, + notInResult: []string{"data_corruption"}, + }, + { + name: "/readyz/non_exist 404", + healthCheckURL: "/readyz/non_exist", + expectStatusCode: http.StatusNotFound, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + apiError: tt.apiError, + authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0), } - health, err := parseHealthOutput(res.Body) - if err != nil { - t.Errorf("fail parse health check output %v", err) + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + }) + } +} + +func TestDataCorruptionCheck(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "Live if CORRUPT alarm is on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}}, + healthCheckURL: "/livez", + expectStatusCode: http.StatusOK, + notInResult: []string{"data_corruption"}, + }, + { + name: "Not ready if CORRUPT alarm is on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}}, + healthCheckURL: "/readyz", + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]data_corruption failed: alarm activated: CORRUPT"}, + }, + { + name: "ready if CORRUPT alarm is not on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, + healthCheckURL: "/readyz", + expectStatusCode: http.StatusOK, + }, + { + name: "ready if CORRUPT alarm is excluded", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}, {MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, + healthCheckURL: "/readyz?exclude=data_corruption", + expectStatusCode: http.StatusOK, + }, + { + name: "Not ready if CORRUPT alarm is on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}}, + healthCheckURL: "/readyz?exclude=non_exist", + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]data_corruption failed: alarm activated: CORRUPT"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0), } - if health.Health != tt.expectHealth { - t.Errorf("want health %s but got %s", tt.expectHealth, health.Health) + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + // OK before alarms are activated. + checkHttpResponse(t, ts, tt.healthCheckURL, http.StatusOK, nil, nil) + // Activate the alarms. + s.alarms = tt.alarms + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + }) + } +} + +func TestSerializableReadCheck(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "Alive normal", + healthCheckURL: "/livez?verbose", + expectStatusCode: http.StatusOK, + inResult: []string{"[+]serializable_read ok"}, + }, + { + name: "Not alive if range api is not available", + healthCheckURL: "/livez", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]serializable_read failed: range error: Unexpected error"}, + }, + { + name: "Not ready if range api is not available", + healthCheckURL: "/readyz", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]serializable_read failed: range error: Unexpected error"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + apiError: tt.apiError, + authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0), } + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) }) } } -func parseHealthOutput(body io.Reader) (Health, error) { - obj := Health{} - d, derr := io.ReadAll(body) - if derr != nil { - return obj, derr +func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) { + res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)}) + + if err != nil { + t.Fatalf("fail serve http request %s %v", url, err) + } + if res.StatusCode != expectStatusCode { + t.Errorf("want statusCode %d but got %d", expectStatusCode, res.StatusCode) + } + defer res.Body.Close() + b, err := io.ReadAll(res.Body) + if err != nil { + t.Fatalf("Failed to read response for %s", url) + } + result := string(b) + for _, substr := range inResult { + if !strings.Contains(result, substr) { + t.Errorf("Could not find substring : %s, in response: %s", substr, result) + return + } } - if err := json.Unmarshal(d, &obj); err != nil { - return obj, err + for _, substr := range notInResult { + if strings.Contains(result, substr) { + t.Errorf("Do not expect substring : %s, in response: %s", substr, result) + return + } } - return obj, nil } diff --git a/server/proxy/grpcproxy/health.go b/server/proxy/grpcproxy/health.go index 456564f1047..b4a683242aa 100644 --- a/server/proxy/grpcproxy/health.go +++ b/server/proxy/grpcproxy/health.go @@ -32,7 +32,7 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) { if lg == nil { lg = zap.NewNop() } - mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { + mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.StringSet, serializable bool) etcdhttp.Health { return checkHealth(c) })) } @@ -42,7 +42,7 @@ func HandleProxyHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) { if lg == nil { lg = zap.NewNop() } - mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { + mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.StringSet, serializable bool) etcdhttp.Health { return checkProxyHealth(c) })) }