Skip to content

Commit

Permalink
Merge pull request #16651 from siyuanfoundation/livez-pr
Browse files Browse the repository at this point in the history
Add livez and readyz for etcd
  • Loading branch information
serathius authored Oct 17, 2023
2 parents e51b639 + 7a57e06 commit cba514e
Show file tree
Hide file tree
Showing 3 changed files with 382 additions and 72 deletions.
198 changes: 185 additions & 13 deletions server/etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
package etcdhttp

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"path"
"strings"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -46,7 +49,7 @@ type ServerHealth interface {
// HandleHealth registers metrics and health handlers. it checks health by using v3 range request
// and its corresponding timeout.
func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms AlarmSet, serializable bool) Health {
mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health {
if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" {
return h
}
Expand All @@ -55,18 +58,21 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) {
}
return checkAPI(ctx, lg, srv, serializable)
}))

installLivezEndpoints(lg, mux, srv)
installReadyzEndpoints(lg, mux, srv)
}

// NewHealthHandler handles '/health' requests.
func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAlarms AlarmSet, Serializable bool) Health) http.HandlerFunc {
func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAlarms StringSet, Serializable bool) Health) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
lg.Warn("/health error", zap.Int("status-code", http.StatusMethodNotAllowed))
return
}
excludedAlarms := getExcludedAlarms(r)
excludedAlarms := getQuerySet(r, "exclude")
// Passing the query parameter "serializable=true" ensures that the
// health of the local etcd is checked vs the health of the cluster.
// This is useful for probes attempting to validate the liveness of
Expand Down Expand Up @@ -119,20 +125,18 @@ type Health struct {
Reason string `json:"reason"`
}

type AlarmSet map[string]struct{}

func getExcludedAlarms(r *http.Request) (alarms AlarmSet) {
alarms = make(map[string]struct{}, 2)
alms, found := r.URL.Query()["exclude"]
func getQuerySet(r *http.Request, query string) StringSet {
querySet := make(map[string]struct{})
qs, found := r.URL.Query()[query]
if found {
for _, alm := range alms {
if len(alm) == 0 {
for _, q := range qs {
if len(q) == 0 {
continue
}
alarms[alm] = struct{}{}
querySet[q] = struct{}{}
}
}
return alarms
return querySet
}

func getSerializableFlag(r *http.Request) bool {
Expand All @@ -141,7 +145,7 @@ func getSerializableFlag(r *http.Request) bool {

// TODO: etcdserver.ErrNoLeader in health API

func checkAlarms(lg *zap.Logger, srv ServerHealth, excludedAlarms AlarmSet) Health {
func checkAlarms(lg *zap.Logger, srv ServerHealth, excludedAlarms StringSet) Health {
h := Health{Health: "true"}

for _, v := range srv.Alarms() {
Expand Down Expand Up @@ -193,3 +197,171 @@ func checkAPI(ctx context.Context, lg *zap.Logger, srv ServerHealth, serializabl
lg.Debug("serving /health true")
return h
}

type HealthCheck func(ctx context.Context) error

type CheckRegistry struct {
path string
checks map[string]HealthCheck
}

func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck)}
reg.Register("serializable_read", serializableReadCheck(server))
reg.InstallHttpEndpoints(lg, mux)
}

func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{path: "/readyz", checks: make(map[string]HealthCheck)}
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
reg.Register("serializable_read", serializableReadCheck(server))
reg.InstallHttpEndpoints(lg, mux)
}

func (reg *CheckRegistry) Register(name string, check HealthCheck) {
reg.checks[name] = check
}

func (reg *CheckRegistry) InstallHttpEndpoints(lg *zap.Logger, mux *http.ServeMux) {
checkNames := make([]string, 0, len(reg.checks))
for k := range reg.checks {
checkNames = append(checkNames, k)
}

// installs the http handler for the root path.
reg.installRootHttpEndpoint(lg, mux, reg.path, checkNames...)
for _, checkName := range checkNames {
// installs the http handler for the individual check sub path.
subpath := path.Join(reg.path, checkName)
check := checkName
mux.Handle(subpath, newHealthHandler(subpath, lg, func(r *http.Request) Health {
return reg.runHealthChecks(r.Context(), check)
}))
}
}

func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...string) Health {
h := Health{Health: "true"}
var individualCheckOutput bytes.Buffer
for _, checkName := range checkNames {
check, found := reg.checks[checkName]
if !found {
panic(fmt.Errorf("Health check: %s not registered", checkName))
}
if err := check(ctx); err != nil {
fmt.Fprintf(&individualCheckOutput, "[-]%s failed: %v\n", checkName, err)
h.Health = "false"
} else {
fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", checkName)
}
}
h.Reason = individualCheckOutput.String()
return h
}

// installRootHttpEndpoint installs the http handler for the root path.
func (reg *CheckRegistry) installRootHttpEndpoint(lg *zap.Logger, mux *http.ServeMux, path string, checks ...string) {
hfunc := func(r *http.Request) Health {
// extracts the health check names to be excludeList from the query param
excluded := getQuerySet(r, "exclude")

filteredCheckNames := filterCheckList(lg, listToStringSet(checks), excluded)
return reg.runHealthChecks(r.Context(), filteredCheckNames...)
}
mux.Handle(path, newHealthHandler(path, lg, hfunc))
}

// newHealthHandler generates a http HandlerFunc for a health check function hfunc.
func newHealthHandler(path string, lg *zap.Logger, hfunc func(*http.Request) Health) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
lg.Warn("Health request error", zap.String("path", path), zap.Int("status-code", http.StatusMethodNotAllowed))
return
}
h := hfunc(r)
// Always returns detailed reason for failed checks.
if h.Health != "true" {
http.Error(w, h.Reason, http.StatusServiceUnavailable)
lg.Error("Health check error", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusServiceUnavailable))
return
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
// Only writes detailed reason for verbose requests.
if _, found := r.URL.Query()["verbose"]; found {
fmt.Fprint(w, h.Reason)
}
fmt.Fprint(w, "ok\n")
lg.Debug("Health check OK", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusOK))
}
}

func filterCheckList(lg *zap.Logger, checks StringSet, excluded StringSet) []string {
filteredList := []string{}
for chk := range checks {
if _, found := excluded[chk]; found {
delete(excluded, chk)
continue
}
filteredList = append(filteredList, chk)
}
if len(excluded) > 0 {
// For version compatibility, excluding non-exist checks would not fail the request.
lg.Warn("some health checks cannot be excluded", zap.String("missing-health-checks", formatQuoted(excluded.List()...)))
}
return filteredList
}

// formatQuoted returns a formatted string of the health check names,
// preserving the order passed in.
func formatQuoted(names ...string) string {
quoted := make([]string, 0, len(names))
for _, name := range names {
quoted = append(quoted, fmt.Sprintf("%q", name))
}
return strings.Join(quoted, ",")
}

type StringSet map[string]struct{}

func (s StringSet) List() []string {
keys := make([]string, 0, len(s))
for k := range s {
keys = append(keys, k)
}
return keys
}

func listToStringSet(list []string) StringSet {
set := make(map[string]struct{})
for _, s := range list {
set[s] = struct{}{}
}
return set
}

// activeAlarmCheck checks if a specific alarm type is active in the server.
func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) error {
return func(ctx context.Context) error {
as := srv.Alarms()
for _, v := range as {
if v.Alarm == at {
return fmt.Errorf("alarm activated: %s", at.String())
}
}
return nil
}
}

func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
return func(ctx context.Context) error {
ctx = srv.AuthStore().WithRoot(ctx)
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
if err != nil {
return fmt.Errorf("range error: %w", err)
}
return nil
}
}
Loading

0 comments on commit cba514e

Please sign in to comment.