Skip to content

Commit

Permalink
Merge pull request #16984 from siyuanfoundation/lin-read
Browse files Browse the repository at this point in the history
etcdserver: add linearizable_read check to readyz.
  • Loading branch information
ahrtr authored Nov 21, 2023
2 parents 16e4416 + 12b6405 commit fd0882b
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 28 deletions.
18 changes: 10 additions & 8 deletions server/etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,19 @@ type CheckRegistry struct {

func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
reg.Register("serializable_read", serializableReadCheck(server))
reg.Register("serializable_read", readCheck(server, true /* serializable */))
reg.InstallHttpEndpoints(lg, mux)
}

func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)}
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
reg.Register("serializable_read", serializableReadCheck(server))
// serializable_read checks if local read is ok.
// linearizable_read checks if there is consensus in the cluster.
// Having both serializable_read and linearizable_read helps isolate the cause of problems if there is a read failure.
reg.Register("serializable_read", readCheck(server, true))
// linearizable_read check would be replaced by read_index check in 3.6
reg.Register("linearizable_read", readCheck(server, false))
reg.InstallHttpEndpoints(lg, mux)
}

Expand Down Expand Up @@ -410,13 +415,10 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e
}
}

func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
func readCheck(srv ServerHealth, serializable bool) func(ctx context.Context) error {
return func(ctx context.Context) error {
ctx = srv.AuthStore().WithRoot(ctx)
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
if err != nil {
return fmt.Errorf("range error: %w", err)
}
return nil
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
return err
}
}
76 changes: 61 additions & 15 deletions server/etcdserver/api/etcdhttp/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ import (

type fakeHealthServer struct {
fakeServer
apiError error
missingLeader bool
authStore auth.AuthStore
serializableReadError error
linearizableReadError error
missingLeader bool
authStore auth.AuthStore
}

func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
return nil, s.apiError
func (s *fakeHealthServer) Range(_ context.Context, req *pb.RangeRequest) (*pb.RangeResponse, error) {
if req.Serializable {
return nil, s.serializableReadError
}
return nil, s.linearizableReadError
}

func (s *fakeHealthServer) Config() config.ServerConfig {
Expand Down Expand Up @@ -148,10 +152,11 @@ func TestHealthHandler(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{
fakeServer: fakeServer{alarms: tt.alarms},
apiError: tt.apiError,
missingLeader: tt.missingLeader,
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0),
fakeServer: fakeServer{alarms: tt.alarms},
serializableReadError: tt.apiError,
linearizableReadError: tt.apiError,
missingLeader: tt.missingLeader,
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0),
})
ts := httptest.NewServer(mux)
defer ts.Close()
Expand Down Expand Up @@ -187,8 +192,8 @@ func TestHttpSubPath(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
serializableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
Expand Down Expand Up @@ -271,23 +276,23 @@ func TestSerializableReadCheck(t *testing.T) {
healthCheckURL: "/livez",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
inResult: []string{"[-]serializable_read failed: Unexpected error"},
},
{
name: "Not ready if range api is not available",
healthCheckURL: "/readyz",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
inResult: []string{"[-]serializable_read failed: Unexpected error"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
serializableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
Expand All @@ -298,6 +303,47 @@ func TestSerializableReadCheck(t *testing.T) {
}
}

func TestLinearizableReadCheck(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
tests := []healthTestCase{
{
name: "Alive normal",
healthCheckURL: "/livez?verbose",
expectStatusCode: http.StatusOK,
inResult: []string{"[+]serializable_read ok"},
},
{
name: "Alive if lineariable range api is not available",
healthCheckURL: "/livez",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusOK,
},
{
name: "Not ready if range api is not available",
healthCheckURL: "/readyz",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[+]serializable_read ok", "[-]linearizable_read failed: Unexpected error"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
linearizableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
defer ts.Close()
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode)
})
}
}

func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})

Expand Down
42 changes: 37 additions & 5 deletions tests/e2e/http_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,22 +226,54 @@ func TestHTTPLivezReadyzHandler(t *testing.T) {
name: "overloaded server slow apply",
injectFailure: triggerSlowApply,
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithGoFailEnabled(true)},
healthChecks: defaultHealthCheckConfigs,
// TODO expected behavior of readyz check should be 200 after ReadIndex check is implemented to replace linearizable read.
healthChecks: []healthCheckConfig{
{
url: "/livez",
expectedStatusCode: http.StatusOK,
},
{
url: "/readyz",
expectedTimeoutError: true,
expectedStatusCode: http.StatusServiceUnavailable,
},
},
},
{
name: "network partitioned",
injectFailure: blackhole,
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithIsPeerTLS(true), e2e.WithPeerProxy(true)},
// TODO expected behavior of readyz check should be 503 or timeout after ReadIndex check is implemented.
healthChecks: defaultHealthCheckConfigs,
healthChecks: []healthCheckConfig{
{
url: "/livez",
expectedStatusCode: http.StatusOK,
},
{
url: "/readyz",
expectedTimeoutError: true,
expectedStatusCode: http.StatusServiceUnavailable,
expectedRespSubStrings: []string{
`[-]linearizable_read failed: etcdserver: leader changed`,
},
},
},
},
{
name: "raft loop deadlock",
injectFailure: triggerRaftLoopDeadLock,
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true)},
// TODO expected behavior of livez check should be 503 or timeout after RaftLoopDeadLock check is implemented.
// TODO expected behavior of readyz check should be 503 or timeout after ReadIndex check is implemented.
healthChecks: defaultHealthCheckConfigs,
healthChecks: []healthCheckConfig{
{
url: "/livez",
expectedStatusCode: http.StatusOK,
},
{
url: "/readyz",
expectedTimeoutError: true,
expectedStatusCode: http.StatusServiceUnavailable,
},
},
},
// verify that auth enabled serializable read must go through mvcc
{
Expand Down

0 comments on commit fd0882b

Please sign in to comment.