Skip to content

Commit

Permalink
etcdserver: add livez and ready http endpoints for etcd.
Browse files Browse the repository at this point in the history
Add two separate probes, one for liveness and one for readiness. The liveness probe would check that the local individual node is up and running, or else restart the node, while the readiness probe would check that the cluster is ready to serve traffic. This would make etcd health-check fully Kubernetes API complient.

Signed-off-by: Siyuan Zhang <[email protected]>
  • Loading branch information
siyuanfoundation committed Oct 13, 2023
1 parent 0c0128a commit 9e30c01
Show file tree
Hide file tree
Showing 3 changed files with 371 additions and 70 deletions.
190 changes: 177 additions & 13 deletions server/etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
package etcdhttp

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"path"
"strings"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand All @@ -40,12 +43,13 @@ type ServerHealth interface {
Leader() types.ID
Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error)
Config() config.ServerConfig
AuthStore() auth.AuthStore
}

// HandleHealth registers metrics and health handlers. it checks health by using v3 range request
// and its corresponding timeout.
func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool) Health {
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms StringSet, serializable bool) Health {
if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" {
return h
}
Expand All @@ -54,18 +58,21 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
}
return checkAPI(lg, srv, serializable)
}))

installLivezEndpoints(lg, mux, srv)
installReadyzEndpoints(lg, mux, srv)
}

// NewHealthHandler handles '/health' requests.
func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet, Serializable bool) Health) http.HandlerFunc {
func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms StringSet, Serializable bool) Health) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
lg.Warn("/health error", zap.Int("status-code", http.StatusMethodNotAllowed))
return
}
excludedAlarms := getExcludedAlarms(r)
excludedAlarms := getQuerySet(r, "exclude")
// Passing the query parameter "serializable=true" ensures that the
// health of the local etcd is checked vs the health of the cluster.
// This is useful for probes attempting to validate the liveness of
Expand Down Expand Up @@ -118,20 +125,18 @@ type Health struct {
Reason string `json:"reason"`
}

type AlarmSet map[string]struct{}

func getExcludedAlarms(r *http.Request) (alarms AlarmSet) {
alarms = make(map[string]struct{}, 2)
alms, found := r.URL.Query()["exclude"]
func getQuerySet(r *http.Request, query string) StringSet {
querySet := make(map[string]struct{})
qs, found := r.URL.Query()[query]
if found {
for _, alm := range alms {
if len(alm) == 0 {
for _, q := range qs {
if len(q) == 0 {
continue
}
alarms[alm] = struct{}{}
querySet[q] = struct{}{}
}
}
return alarms
return querySet
}

func getSerializableFlag(r *http.Request) bool {
Expand All @@ -140,7 +145,7 @@ func getSerializableFlag(r *http.Request) bool {

// TODO: etcdserver.ErrNoLeader in health API

func checkAlarms(lg *zap.Logger, srv ServerHealth, excludedAlarms AlarmSet) Health {
func checkAlarms(lg *zap.Logger, srv ServerHealth, excludedAlarms StringSet) Health {
h := Health{Health: "true"}
as := srv.Alarms()
if len(as) > 0 {
Expand Down Expand Up @@ -193,3 +198,162 @@ func checkAPI(lg *zap.Logger, srv ServerHealth, serializable bool) Health {
lg.Debug("serving /health true")
return h
}

type HealthCheck func(ctx context.Context) error

type CheckRegistry struct {
path string
checks map[string]HealthCheck
}

func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck)}
reg.Register("serializable_read", serializableReadCheck(server))
reg.InstallHttpEndpoints(lg, mux)
}

func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{path: "/readyz", checks: make(map[string]HealthCheck)}
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
reg.Register("serializable_read", serializableReadCheck(server))
reg.InstallHttpEndpoints(lg, mux)
}

func (reg *CheckRegistry) Register(name string, check HealthCheck) {
reg.checks[name] = check
}

func (reg *CheckRegistry) InstallHttpEndpoints(lg *zap.Logger, mux *http.ServeMux) {
checkNames := make([]string, 0, len(reg.checks))
for k := range reg.checks {
checkNames = append(checkNames, k)
}

// installs the http handler for the root path.
reg.installRootHttpEndpoint(lg, mux, reg.path, checkNames...)
for _, checkName := range checkNames {
// installs the http handler for the individual check sub path.
subpath := path.Join(reg.path, checkName)
check := checkName
mux.Handle(subpath, newHealthHandler(subpath, lg,
func(r *http.Request) Health { return reg.runHealthChecks(r.Context(), check) }))
}
}

func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...string) Health {
h := Health{Health: "true"}
var individualCheckOutput bytes.Buffer
for _, checkName := range checkNames {
check, found := reg.checks[checkName]
if !found {
panic(fmt.Errorf("Health check: %s not registered", checkName))
}
if err := check(ctx); err != nil {
fmt.Fprintf(&individualCheckOutput, "[-]%s failed: %v\n", checkName, err)
h.Health = "false"
} else {
fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", checkName)
}
}
h.Reason = individualCheckOutput.String()
return h
}

// installRootHttpEndpoint installs the http handler for the root path.
func (reg *CheckRegistry) installRootHttpEndpoint(lg *zap.Logger, mux *http.ServeMux, path string, checks ...string) {
hfunc := func(r *http.Request) Health {
// extracts the health check names to be excludeList from the query param
excluded := getQuerySet(r, "exclude")

filteredCheckNames := filterCheckList(lg, checks, excluded)
return reg.runHealthChecks(r.Context(), filteredCheckNames...)
}
mux.Handle(path, newHealthHandler(path, lg, hfunc))
}

// newHealthHandler generates a http HandlerFunc for a health check function hfunc.
func newHealthHandler(path string, lg *zap.Logger, hfunc func(*http.Request) Health) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
lg.Warn("Health request error", zap.String("path", path), zap.Int("status-code", http.StatusMethodNotAllowed))
return
}
h := hfunc(r)
// Always returns detailed reason for failed checks.
if h.Health != "true" {
http.Error(w, h.Reason, http.StatusServiceUnavailable)
lg.Error("Health check error", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusServiceUnavailable))
return
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
// Only writes detailed reason for verbose requests.
if _, found := r.URL.Query()["verbose"]; found {
fmt.Fprint(w, h.Reason)
}
fmt.Fprint(w, "ok\n")
lg.Debug("Health check OK", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusOK))
}
}

func filterCheckList(lg *zap.Logger, checks []string, excluded StringSet) []string {
filteredList := []string{}
for _, chk := range checks {
if _, found := excluded[chk]; found {
delete(excluded, chk)
continue
}
filteredList = append(filteredList, chk)
}
if len(excluded) > 0 {
// For version compatibility, excluding non-exist checks would not fail the request.
lg.Warn("some health checks cannot be excluded", zap.String("missing-health-checks", formatQuoted(excluded.List()...)))
}
return filteredList
}

// formatQuoted returns a formatted string of the health check names,
// preserving the order passed in.
func formatQuoted(names ...string) string {
quoted := make([]string, 0, len(names))
for _, name := range names {
quoted = append(quoted, fmt.Sprintf("%q", name))
}
return strings.Join(quoted, ",")
}

type StringSet map[string]struct{}

func (s StringSet) List() []string {
keys := make([]string, 0, len(s))
for k := range s {
keys = append(keys, k)
}
return keys
}

// activeAlarmCheck checks if a specific alarm type is active in the server.
func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) error {
return func(ctx context.Context) error {
as := srv.Alarms()
for _, v := range as {
if v.Alarm == at {
return fmt.Errorf("Alarm active: %s", at.String())
}
}
return nil
}
}

func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
return func(ctx context.Context) error {
ctx = srv.AuthStore().WithRoot(ctx)
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
if err != nil {
return fmt.Errorf("Range error: %s", err)
}
return nil
}
}
Loading

0 comments on commit 9e30c01

Please sign in to comment.