Skip to content

Commit

Permalink
etcdserver: add livez and ready http endpoints for etcd.
Browse files Browse the repository at this point in the history
Add two separate probes, one for liveness and one for readiness. The liveness probe would check that the local individual node is up and running, or else restart the node, while the readiness probe would check that the cluster is ready to serve traffic. This would make etcd health-check fully Kubernetes API complient.

Signed-off-by: Siyuan Zhang <[email protected]>
  • Loading branch information
siyuanfoundation committed Sep 27, 2023
1 parent e85949d commit f5d4788
Show file tree
Hide file tree
Showing 4 changed files with 726 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ func (e *Etcd) serveClients() (err error) {
etcdhttp.HandleVersion(mux, e.Server)
etcdhttp.HandleMetrics(mux)
etcdhttp.HandleHealth(e.cfg.logger, mux, e.Server)
e.Server.InstallLivezReadyz(e.cfg.logger, mux)

var gopts []grpc.ServerOption
if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
Expand Down
388 changes: 388 additions & 0 deletions server/etcdserver/healthz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,388 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdserver

import (
"bytes"
"context"
"fmt"
"net/http"
"strings"
"sync"
"time"

"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/auth"
)

const (
PathLivez = "/livez"
PathReadyz = "/readyz"
PathHealthz = "/healthz"
)

type EtcdServerHealth interface {
Alarms() []*etcdserverpb.AlarmMember
Range(context.Context, *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error)
}

// mux is an interface describing the methods InstallHandler requires.
type mux interface {
Handle(pattern string, handler http.Handler)
}

type HealthChecker interface {
Check() error
}

type HealthStatus struct {
IsOk bool `json:"isOk"`
Reason string `json:"reason"`
}

// InstallLivezReadyz creates the livez+readyz+healthz http endpoint for this server.
func (s *EtcdServer) InstallLivezReadyz(lg *zap.Logger, mux mux) {
s.healthHandler.InstallLivezReadyz(lg, mux)
}

// AddHealthCheck allows you to add a HealthChecker to the registery. It can be tied to livez or readyz or both.
func (s *EtcdServer) AddHealthCheck(name string, check HealthChecker, isLivez bool, isReadyz bool) error {
return s.healthHandler.AddHealthCheck(name, check, isLivez, isReadyz)
}

type HealthHandler struct {
server EtcdServerHealth
// lock for health check related functions.
healthMux sync.Mutex
// stores all the added health checks, map of HealthChecker.Name() : HealthChecker
healthCheckStore map[string]HealthChecker
// default checks for healthz endpoint, which is all the keys in healthCheckStore.
healthzChecks []string
healthzChecksInstalled bool

// default checks for livez endpoint
livezChecks []string
livezChecksInstalled bool
// default checks for readyz endpoint
readyzChecks []string
readyzChecksInstalled bool
}

func NewHealthHandler(s EtcdServerHealth) (handler *HealthHandler, err error) {
handler = &HealthHandler{
server: s,
healthCheckStore: make(map[string]HealthChecker),
healthzChecks: []string{},
livezChecks: []string{},
readyzChecks: []string{},
}
if err = handler.addDefaultHealthChecks(); err != nil {
return nil, err
}
return handler, nil
}

// PingHealthz returns true automatically when checked
var PingHealthz HealthChecker = ping{}

// ping implements the simplest possible healthz checker.
type ping struct{}

func (ping) Check() error {
return nil
}

// healthzCheck implements HealthChecker on an arbitrary check function.
type healthzCheck struct {
check func() error
}

func (c *healthzCheck) Check() error {
return c.check()
}

// NewHealthChecker returns a healthz checker for the given function.
func NewHealthChecker(check func() error) HealthChecker {
return &healthzCheck{check}
}

func (h *HealthHandler) addDefaultHealthChecks() error {
// ----------------- checks that should be included both in livez and readyz -----------------
// Add a default simping ping check.
h.AddHealthCheck("ping", PingHealthz, true, true)
serializableReadCheck := NewHealthChecker(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := h.server.Range(ctx, &etcdserverpb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
cancel()
if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied {
return fmt.Errorf("RANGE ERROR:%s", err)
}
return nil
})
h.AddHealthCheck("serializable_read", serializableReadCheck, true, true)
// --------------------------------------------------------------------
// ----------------- checks that should be included only in livez -----------------

// --------------------------------------------------------------------
// ----------------- checks that should be included only in readyz -----------------
corruptionAlarmCheck := NewHealthChecker(func() error {
return checkAlarm(h.server, etcdserverpb.AlarmType_CORRUPT)
})
h.AddHealthCheck("data_corruption", corruptionAlarmCheck, false, true)
// --------------------------------------------------------------------
return nil
}

// AddHealthCheck allows you to add a HealthChecker to the registery. It can be tied to livez or readyz or both.
func (h *HealthHandler) AddHealthCheck(name string, check HealthChecker, isLivez bool, isReadyz bool) error {
h.healthMux.Lock()
defer h.healthMux.Unlock()
if _, found := h.healthCheckStore[name]; found {
return fmt.Errorf("Health check with the name of %s already exists.", name)
}
// Any new health checks can only be added before the healthz endpoint is created.
if h.healthzChecksInstalled {
return fmt.Errorf("unable to add because the healthz endpoint has already been created")
}
// New livez checks can only be added before the livez endpoint is created.
if isLivez {
if h.livezChecksInstalled {
return fmt.Errorf("unable to add because the livez endpoint has already been created")
}
if isReadyz && h.readyzChecksInstalled {
return fmt.Errorf("unable to add because the readyz endpoint has already been created")
}
h.livezChecks = append(h.livezChecks, name)
}
// New readyz checks can only be added before the readyz endpoint is created.
if isReadyz {
if h.readyzChecksInstalled {
return fmt.Errorf("unable to add because the readyz endpoint has already been created")
}
h.readyzChecks = append(h.readyzChecks, name)
}
h.healthCheckStore[name] = check
h.healthzChecks = append(h.healthzChecks, name)
return nil
}

// installHealthz creates the healthz endpoint for this server.
func (h *HealthHandler) installHealthz(lg *zap.Logger, mux mux) {
h.healthMux.Lock()
defer h.healthMux.Unlock()
h.healthzChecksInstalled = true
h.installPathHandler(lg, mux, PathHealthz, h.healthzChecks)
}

// installReadyz creates the readyz endpoint for this server.
func (h *HealthHandler) installReadyz(lg *zap.Logger, mux mux) {
h.healthMux.Lock()
defer h.healthMux.Unlock()
h.readyzChecksInstalled = true
h.installPathHandler(lg, mux, PathReadyz, h.readyzChecks)
}

// installLivez creates the livez endpoint for this server.
func (h *HealthHandler) installLivez(lg *zap.Logger, mux mux) {
h.healthMux.Lock()
defer h.healthMux.Unlock()
h.livezChecksInstalled = true
h.installPathHandler(lg, mux, PathLivez, h.livezChecks)
}

// InstallLivezReadyz creates the livez+readyz+healthz endpoint for this server.
func (h *HealthHandler) InstallLivezReadyz(lg *zap.Logger, mux mux) {
h.installLivez(lg, mux)
h.installReadyz(lg, mux)
h.installHealthz(lg, mux)
}

// installPathHandler registers handlers for health checking on
// a specific path to mux. *All handlers* for the path must be
// specified in exactly one call to installPathHandler. Calling
// installPathHandler more than once for the same path and mux will
// result in a panic.
func (h *HealthHandler) installPathHandler(lg *zap.Logger, mux mux, path string, checks []string) {
if len(checks) == 0 {
lg.Info("No default health checks specified. Installing the ping handler.")
checks = []string{"ping"}
}

lg.Sugar().Infof("Installing health checkers for (%v): %v", path, formatQuoted(checks...))

name := strings.Split(strings.TrimPrefix(path, "/"), "/")[0]
mux.Handle(path,
h.handleRootHealth(lg, name, checks))
for _, check := range checks {
chk, found := h.healthCheckStore[check]
if !found {
lg.Sugar().Warnf("HealthChecker %s not registered.", check)
continue
}
mux.Handle(fmt.Sprintf("%s/%v", path, check), adaptCheckToHandler(chk.Check))
}
}

func (h *HealthHandler) checkHealth(lg *zap.Logger, path string, excludeList, allowList, checks []string) (HealthStatus, error) {
healthStatus := HealthStatus{IsOk: true}
if len(excludeList) > 0 && len(allowList) > 0 {
return healthStatus, fmt.Errorf("do not expect both allowlist: %v and exclude list: %v to be both specified in health check.", allowList, excludeList)
}
excluded := listToStringSet(excludeList)
included := listToStringSet(allowList)
failedChecks := []string{}
// failedVerboseLogOutput is for output to the log. It indicates detailed failed output information for the log.
var individualCheckOutput, failedVerboseLogOutput bytes.Buffer
for _, check := range checks {
if len(allowList) > 0 {
if _, found := included[check]; !found {
fmt.Fprintf(&individualCheckOutput, "[+]%s not included: ok\n", check)
continue
}
delete(included, check)
} else {
// no-op the check if we've specified we want to exclude the check
if _, found := excluded[check]; found {
delete(excluded, check)
fmt.Fprintf(&individualCheckOutput, "[+]%s excluded: ok\n", check)
continue
}
}
chk, found := h.healthCheckStore[check]
if !found {
lg.Sugar().Warnf("HealthChecker %s not registered.", check)
continue
}
if err := chk.Check(); err != nil {
// don't include the error since this endpoint is public. If someone wants more detail
// they should have explicit permission to the detailed checks.
fmt.Fprintf(&individualCheckOutput, "[-]%s failed: reason withheld\n", check)
// but we do want detailed information for our log
fmt.Fprintf(&failedVerboseLogOutput, "[-]%s failed: %v\n", check, err)
failedChecks = append(failedChecks, check)
} else {
fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", check)
}
}
if len(excluded) > 0 {
fmt.Fprintf(&individualCheckOutput, "warn: some health checks cannot be excluded: no matches for %s\n", formatQuoted(excluded.List()...))
lg.Sugar().Warnf("cannot exclude some health checks, no health checks are installed matching %s",
formatQuoted(excluded.List()...))
}
if len(included) > 0 {
fmt.Fprintf(&individualCheckOutput, "warn: some health checks cannot be included: no matches for %s\n", formatQuoted(included.List()...))
lg.Sugar().Warnf("cannot include some health checks, no health checks are installed matching %s",
formatQuoted(included.List()...))
}
// always be verbose on failure
if len(failedChecks) > 0 {
healthStatus.IsOk = false
lg.Sugar().Errorf("%s check failed: %s\n%v", strings.Join(failedChecks, ","), path, failedVerboseLogOutput.String())
}
lg.Sugar().Infof("%s check passed\n", path)
healthStatus.Reason = individualCheckOutput.String()
return healthStatus, nil
}

// handleRootHealth returns an http.HandlerFunc that serves the provided checks.
func (h *HealthHandler) handleRootHealth(lg *zap.Logger, path string, checks []string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// extracts the health check names to be excludeList from the query param
excludeList, _ := r.URL.Query()["exclude"]
// extracts the health check names to be allowList from the query param
allowList, _ := r.URL.Query()["allowlist"]

healthStatus, err := h.checkHealth(lg, path, excludeList, allowList, checks)
if err != nil {
http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
return
}
// always be verbose on failure
if !healthStatus.IsOk {
http.Error(w, fmt.Sprintf("%s%s check failed", healthStatus.Reason, path), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
if _, found := r.URL.Query()["verbose"]; !found {
fmt.Fprint(w, "ok")
return
}
fmt.Fprint(w, healthStatus.Reason)
fmt.Fprintf(w, "%s check passed\n", path)
}
}

// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks.
func adaptCheckToHandler(c func() error) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := c()
if err != nil {
http.Error(w, fmt.Sprintf("internal server error: %v", err), http.StatusInternalServerError)
} else {
fmt.Fprint(w, "ok")
}
})
}

// formatQuoted returns a formatted string of the health check names,
// preserving the order passed in.
func formatQuoted(names ...string) string {
quoted := make([]string, 0, len(names))
for _, name := range names {
quoted = append(quoted, fmt.Sprintf("%q", name))
}
return strings.Join(quoted, ",")
}

// checkAlarm checks if a specific alarm type is active in the server.
func checkAlarm(srv EtcdServerHealth, at etcdserverpb.AlarmType) error {
as := srv.Alarms()
if len(as) > 0 {
for _, v := range as {
if v.Alarm == at {
return fmt.Errorf("Alarm active:%s", at.String())
}
}
}
return nil
}

type stringSet map[string]struct{}

func (s stringSet) List() []string {
keys := make([]string, len(s))

i := 0
for k := range s {
keys[i] = k
i++
}
return keys
}

func listToStringSet(list []string) stringSet {
set := make(map[string]struct{})
for _, s := range list {
if len(s) == 0 {
continue
}
set[s] = struct{}{}
}
return set
}
Loading

0 comments on commit f5d4788

Please sign in to comment.