From ebb7e796c3cc83f99e7837e060159f8ff991ee90 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Mon, 20 Nov 2023 09:58:40 -0800 Subject: [PATCH] etcdserver: add linearizable_read check to readyz. Signed-off-by: Siyuan Zhang --- server/etcdserver/api/etcdhttp/health.go | 21 ++--- server/etcdserver/api/etcdhttp/health_test.go | 76 +++++++++++++++---- 2 files changed, 72 insertions(+), 25 deletions(-) diff --git a/server/etcdserver/api/etcdhttp/health.go b/server/etcdserver/api/etcdhttp/health.go index a442228012b..0a933429122 100644 --- a/server/etcdserver/api/etcdhttp/health.go +++ b/server/etcdserver/api/etcdhttp/health.go @@ -23,10 +23,9 @@ import ( "encoding/json" "fmt" "net/http" - "time" - "path" "strings" + "time" "go.uber.org/zap" @@ -276,14 +275,19 @@ type CheckRegistry struct { func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)} - reg.Register("serializable_read", serializableReadCheck(server)) + reg.Register("serializable_read", readCheck(server, true /* serializable */)) reg.InstallHttpEndpoints(lg, mux) } func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)} reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT)) - reg.Register("serializable_read", serializableReadCheck(server)) + // serializable_read checks if local read is ok. + // linearizable_read checks if there is consensus in the cluster. + // Having both serializable_read and linearizable_read helps isolate the cause of problems if there is a read failure. + reg.Register("serializable_read", readCheck(server, true)) + // linearizable_read check would be replaced by read_index check in 3.6 + reg.Register("linearizable_read", readCheck(server, false)) reg.InstallHttpEndpoints(lg, mux) } @@ -447,13 +451,10 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e } } -func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error { +func readCheck(srv ServerHealth, serializable bool) func(ctx context.Context) error { return func(ctx context.Context) error { ctx = srv.AuthStore().WithRoot(ctx) - _, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true}) - if err != nil { - return fmt.Errorf("range error: %w", err) - } - return nil + _, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable}) + return err } } diff --git a/server/etcdserver/api/etcdhttp/health_test.go b/server/etcdserver/api/etcdhttp/health_test.go index bb1fbd5cbf1..0019121755a 100644 --- a/server/etcdserver/api/etcdhttp/health_test.go +++ b/server/etcdserver/api/etcdhttp/health_test.go @@ -23,13 +23,17 @@ import ( type fakeHealthServer struct { fakeServer - apiError error - missingLeader bool - authStore auth.AuthStore + serializableReadError error + linearizableReadError error + missingLeader bool + authStore auth.AuthStore } -func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) { - return nil, s.apiError +func (s *fakeHealthServer) Range(_ context.Context, req *pb.RangeRequest) (*pb.RangeResponse, error) { + if req.Serializable { + return nil, s.serializableReadError + } + return nil, s.linearizableReadError } func (s *fakeHealthServer) Config() config.ServerConfig { @@ -132,10 +136,11 @@ func TestHealthHandler(t *testing.T) { be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be) HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{ - fakeServer: fakeServer{alarms: tt.alarms}, - apiError: tt.apiError, - missingLeader: tt.missingLeader, - authStore: auth.NewAuthStore(lg, be, nil, 0), + fakeServer: fakeServer{alarms: tt.alarms}, + serializableReadError: tt.apiError, + linearizableReadError: tt.apiError, + missingLeader: tt.missingLeader, + authStore: auth.NewAuthStore(lg, be, nil, 0), }) ts := httptest.NewServer(mux) defer ts.Close() @@ -171,8 +176,8 @@ func TestHttpSubPath(t *testing.T) { mux := http.NewServeMux() logger := zaptest.NewLogger(t) s := &fakeHealthServer{ - apiError: tt.apiError, - authStore: auth.NewAuthStore(logger, be, nil, 0), + serializableReadError: tt.apiError, + authStore: auth.NewAuthStore(logger, be, nil, 0), } HandleHealth(logger, mux, s) ts := httptest.NewServer(mux) @@ -255,14 +260,14 @@ func TestSerializableReadCheck(t *testing.T) { healthCheckURL: "/livez", apiError: fmt.Errorf("Unexpected error"), expectStatusCode: http.StatusServiceUnavailable, - inResult: []string{"[-]serializable_read failed: range error: Unexpected error"}, + inResult: []string{"[-]serializable_read failed: Unexpected error"}, }, { name: "Not ready if range api is not available", healthCheckURL: "/readyz", apiError: fmt.Errorf("Unexpected error"), expectStatusCode: http.StatusServiceUnavailable, - inResult: []string{"[-]serializable_read failed: range error: Unexpected error"}, + inResult: []string{"[-]serializable_read failed: Unexpected error"}, }, } for _, tt := range tests { @@ -270,8 +275,8 @@ func TestSerializableReadCheck(t *testing.T) { mux := http.NewServeMux() logger := zaptest.NewLogger(t) s := &fakeHealthServer{ - apiError: tt.apiError, - authStore: auth.NewAuthStore(logger, be, nil, 0), + serializableReadError: tt.apiError, + authStore: auth.NewAuthStore(logger, be, nil, 0), } HandleHealth(logger, mux, s) ts := httptest.NewServer(mux) @@ -282,6 +287,47 @@ func TestSerializableReadCheck(t *testing.T) { } } +func TestLinearizableReadCheck(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "Alive normal", + healthCheckURL: "/livez?verbose", + expectStatusCode: http.StatusOK, + inResult: []string{"[+]serializable_read ok"}, + }, + { + name: "Alive if lineariable range api is not available", + healthCheckURL: "/livez", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusOK, + }, + { + name: "Not ready if range api is not available", + healthCheckURL: "/readyz", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[+]serializable_read ok", "[-]linearizable_read failed: Unexpected error"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + linearizableReadError: tt.apiError, + authStore: auth.NewAuthStore(logger, be, nil, 0), + } + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode) + }) + } +} + func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) { res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})