Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] Backport healthcheck code cleanup #17120

Merged
merged 6 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,8 @@ func (e *Etcd) serveClients() (err error) {
} else {
mux := http.NewServeMux()
etcdhttp.HandleBasic(mux, e.Server)
etcdhttp.HandleMetrics(mux)
etcdhttp.HandleHealth(mux, e.Server)
h = mux
}

Expand Down Expand Up @@ -861,7 +863,8 @@ func (e *Etcd) serveMetrics() (err error) {

if len(e.cfg.ListenMetricsUrls) > 0 {
metricsMux := http.NewServeMux()
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
etcdhttp.HandleMetrics(metricsMux)
etcdhttp.HandleHealth(metricsMux, e.Server)

for _, murl := range e.cfg.ListenMetricsUrls {
tlsInfo := &e.cfg.ClientTLSInfo
Expand Down
2 changes: 1 addition & 1 deletion etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func startProxy(cfg *config) error {
plog.Infof("v2 proxy started listening on client requests on %q", host)
}
mux := http.NewServeMux()
etcdhttp.HandlePrometheus(mux) // v2 proxy just uses the same port
etcdhttp.HandleMetrics(mux) // v2 proxy just uses the same port
mux.Handle("/", ph)
plog.Fatal(http.Serve(l, mux))
}()
Expand Down
1 change: 0 additions & 1 deletion etcdserver/api/etcdhttp/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func HandleBasic(mux *http.ServeMux, server etcdserver.ServerPeer) {
// TODO: deprecate '/config/local/log' in v3.5
mux.HandleFunc(configPath+"/local/log", logHandleFunc)

HandleMetricsHealth(mux, server)
mux.HandleFunc(versionPath, versionHandler(server.Cluster(), serveVersion))
}

Expand Down
223 changes: 223 additions & 0 deletions etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdhttp

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"go.etcd.io/etcd/auth"
"go.etcd.io/etcd/etcdserver"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/pkg/types"
"go.etcd.io/etcd/raft"

"github.com/prometheus/client_golang/prometheus"
)

const (
PathHealth = "/health"
)

type ServerHealth interface {
serverHealthV2V3
Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error)
Config() etcdserver.ServerConfig
}

type serverHealthV2V3 interface {
Alarms() []*pb.AlarmMember
Leader() types.ID
}

// HandleHealthForV2 registers metrics and health handlers for v2.
func HandleHealthForV2(mux *http.ServeMux, srv etcdserver.ServerV2) {
mux.Handle(PathHealth, NewHealthHandler(func(excludedAlarms AlarmSet, serializable bool) Health {
if h := checkAlarms(srv, excludedAlarms); h.Health != "true" {
return h
}
if h := checkLeader(srv, serializable); h.Health != "true" {
return h
}
return checkV2API(srv)
}))
}

// HandleHealth registers metrics and health handlers. it checks health by using v3 range request
// and its corresponding timeout.
func HandleHealth(mux *http.ServeMux, srv ServerHealth) {
mux.Handle(PathHealth, NewHealthHandler(func(excludedAlarms AlarmSet, serializable bool) Health {
if h := checkAlarms(srv, excludedAlarms); h.Health != "true" {
return h
}
if h := checkLeader(srv, serializable); h.Health != "true" {
return h
}
return checkAPI(srv, serializable)
}))
}

// NewHealthHandler handles '/health' requests.
func NewHealthHandler(hfunc func(excludedAlarms AlarmSet, serializable bool) Health) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
plog.Warningf("/health error (status code %d)", http.StatusMethodNotAllowed)
return
}
excludedAlarms := getExcludedAlarms(r)
// Passing the query parameter "serializable=true" ensures that the
// health of the local etcd is checked vs the health of the cluster.
// This is useful for probes attempting to validate the liveness of
// the etcd process vs readiness of the cluster to serve requests.
serializableFlag := getSerializableFlag(r)
h := hfunc(excludedAlarms, serializableFlag)
defer func() {
if h.Health == "true" {
healthSuccess.Inc()
} else {
healthFailed.Inc()
}
}()
d, _ := json.Marshal(h)
if h.Health != "true" {
http.Error(w, string(d), http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
w.Write(d)
plog.Debugf("/health OK (status code %d)", http.StatusOK)
}
}

var (
healthSuccess = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "health_success",
Help: "The total number of successful health checks",
})
healthFailed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "health_failures",
Help: "The total number of failed health checks",
})
)

func init() {
prometheus.MustRegister(healthSuccess)
prometheus.MustRegister(healthFailed)
}

// Health defines etcd server health status.
// TODO: remove manual parsing in etcdctl cluster-health
type Health struct {
Health string `json:"health"`
Reason string `json:"-"`
}

type AlarmSet map[string]struct{}

func getExcludedAlarms(r *http.Request) (alarms AlarmSet) {
alarms = make(map[string]struct{}, 2)
alms, found := r.URL.Query()["exclude"]
if found {
for _, alm := range alms {
if len(alms) == 0 {
continue
}
alarms[alm] = struct{}{}
}
}
return alarms
}

func getSerializableFlag(r *http.Request) bool {
return r.URL.Query().Get("serializable") == "true"
}

// TODO: etcdserver.ErrNoLeader in health API

func checkAlarms(srv serverHealthV2V3, excludedAlarms AlarmSet) Health {
h := Health{Health: "true"}
as := srv.Alarms()
if len(as) > 0 {
for _, v := range as {
alarmName := v.Alarm.String()
if _, found := excludedAlarms[alarmName]; found {
plog.Debugf("/health excluded alarm %s", v.String())
continue
}

h.Health = "false"
switch v.Alarm {
case pb.AlarmType_NOSPACE:
h.Reason = "ALARM NOSPACE"
case pb.AlarmType_CORRUPT:
h.Reason = "ALARM CORRUPT"
default:
h.Reason = "ALARM UNKNOWN"
}
plog.Warningf("/health error due to %s", v.String())
return h
}
}

return h
}

func checkLeader(srv serverHealthV2V3, serializable bool) Health {
h := Health{Health: "true"}
if !serializable && (uint64(srv.Leader()) == raft.None) {
h.Health = "false"
h.Reason = "RAFT NO LEADER"
plog.Warningf("/health error; no leader (status code %d)", http.StatusServiceUnavailable)
}
return h
}

func checkV2API(srv etcdserver.ServerV2) Health {
h := Health{Health: "true"}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := srv.Do(ctx, pb.Request{Method: "QGET"})
cancel()
if err != nil {
h.Health = "false"
h.Reason = fmt.Sprintf("QGET ERROR:%s", err)
plog.Warningf("/health error; QGET failed %v (status code %d)", err, http.StatusServiceUnavailable)
return h
}
return h
}

func checkAPI(srv ServerHealth, serializable bool) Health {
h := Health{Health: "true"}
cfg := srv.Config()
ctx, cancel := context.WithTimeout(context.Background(), cfg.ReqTimeout())
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
cancel()
if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied {
h.Health = "false"
h.Reason = fmt.Sprintf("RANGE ERROR:%s", err)
plog.Warningf("serving /health false; Range failed %v (status code %d)", err, http.StatusServiceUnavailable)
return h
}
return h
}
Loading
Loading