Skip to content

Commit

Permalink
v3rpc: run health notifier to listen on online defrag state change
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <[email protected]>
  • Loading branch information
chaochn47 committed Oct 27, 2023
1 parent 2dc6275 commit a53af18
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 39 deletions.
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3client/v3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}

mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s, nil))
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)

clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
Expand Down
4 changes: 3 additions & 1 deletion server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
maintenanceServer := NewMaintenanceServer(s)
pb.RegisterMaintenanceServer(grpcServer, maintenanceServer)

hsrv := health.NewServer()
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, NewHealthNotifier(hsrv, s.Logger())))
NewHealthNotifier(hsrv, s, maintenanceServer.DefragNotify())
healthpb.RegisterHealthServer(grpcServer, hsrv)

// set zero values for metrics registered for this grpc server
Expand Down
49 changes: 36 additions & 13 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,57 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.etcd.io/etcd/server/v3/etcdserver"
)

const (
allGRPCServices = ""
)

type HealthNotifier interface {
StartServe()
StopServe(reason string)
type etcdServerHealth interface {
StopNotify() <-chan struct{}
}

func NewHealthNotifier(hs *health.Server, lg *zap.Logger) HealthNotifier {
func NewHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer, defragNotifier <-chan defragStatus) {
if hs == nil {
panic("unexpected nil gRPC health server")
}
if lg == nil {
lg = zap.NewNop()
}
hc := &healthChecker{hs: hs, lg: lg}
hc := &healthNotifier{hs: hs, s: s, lg: s.Logger(), defragNotifier: defragNotifier, stopGRPCServiceOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag}
// set grpc health server as serving status blindly since
// the grpc server will serve iff s.ReadyNotify() is closed.
hc.StartServe()
return hc
hc.startServe()
go hc.watchDefragStatus()
}

type healthChecker struct {
type healthNotifier struct {
hs *health.Server
s etcdServerHealth
lg *zap.Logger

defragNotifier <-chan defragStatus
stopGRPCServiceOnDefrag bool
}

func (hc *healthChecker) StartServe() {
func (hc *healthNotifier) watchDefragStatus() {
for {
select {
case <-hc.s.StopNotify():
return
case status := <-hc.defragNotifier:
switch status {
case defragStarted:
hc.stopServe("defrag is active")
case defragFinished:
hc.startServe()
default:
panic("unsupported defrag status")
}
}
}
}

func (hc *healthNotifier) startServe() {
hc.lg.Info(
"grpc service status changed",
zap.String("service", allGRPCServices),
Expand All @@ -57,7 +77,10 @@ func (hc *healthChecker) StartServe() {
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_SERVING)
}

func (hc *healthChecker) StopServe(reason string) {
func (hc *healthNotifier) stopServe(reason string) {
if !hc.stopGRPCServiceOnDefrag {
return
}
hc.lg.Warn(
"grpc service status changed",
zap.String("service", allGRPCServices),
Expand Down
26 changes: 18 additions & 8 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ type ClusterStatusGetter interface {
IsLearner() bool
}

type defragStatus int

const (
defragStarted defragStatus = iota
defragFinished
)

type maintenanceServer struct {
lg *zap.Logger
rg apply.RaftStatusGetter
Expand All @@ -74,13 +81,12 @@ type maintenanceServer struct {
cs ClusterStatusGetter
d Downgrader
vs serverversion.Server
hn HealthNotifier

stopServingOnDefrag bool
defragC chan defragStatus
}

func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), hn: hn, stopServingOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag}
func NewMaintenanceServer(s *etcdserver.EtcdServer) *authMaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), defragC: make(chan defragStatus)}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
Expand All @@ -89,10 +95,10 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.Mainte

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
if ms.stopServingOnDefrag {
ms.hn.StopServe("defrag is active")
defer ms.hn.StartServe()
}
ms.defragC <- defragStarted
defer func() {
ms.defragC <- defragFinished
}()
err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand All @@ -102,6 +108,10 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
return &pb.DefragmentResponse{}, nil
}

func (ms *maintenanceServer) DefragNotify() <-chan defragStatus {
return ms.defragC
}

// big enough size to hold >1 OS pages in the buffer
const snapshotSendBufferSize = 32 * 1024

Expand Down
35 changes: 19 additions & 16 deletions tests/e2e/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func TestFailoverOnDefrag(t *testing.T) {
gRPCDialOptions []grpc.DialOption

// common assertion
expectedMinTotalRequestsCount int
expectedMinQPS float64
// happy case assertion
expectedMaxFailedRequestsCount int
expectedMaxFailureRate float64
// negative case assertion
expectedMinFailedRequestsCount int
expectedMinFailureRate float64
}{
{
name: "defrag failover happy case",
Expand All @@ -66,8 +66,8 @@ func TestFailoverOnDefrag(t *testing.T) {
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinTotalRequestsCount: 300,
expectedMaxFailedRequestsCount: 5,
expectedMinQPS: 20,
expectedMaxFailureRate: 0.01,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false",
Expand All @@ -80,8 +80,8 @@ func TestFailoverOnDefrag(t *testing.T) {
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinTotalRequestsCount: 300,
expectedMinFailedRequestsCount: 90,
expectedMinQPS: 20,
expectedMinFailureRate: 0.25,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled",
Expand All @@ -90,8 +90,8 @@ func TestFailoverOnDefrag(t *testing.T) {
e2e.WithExperimentalStopGRPCServiceOnDefrag(true),
e2e.WithGoFailEnabled(true),
},
expectedMinTotalRequestsCount: 300,
expectedMinFailedRequestsCount: 90,
expectedMinQPS: 20,
expectedMinFailureRate: 0.25,
},
}

Expand All @@ -105,6 +105,7 @@ func TestFailoverOnDefrag(t *testing.T) {
endpoints := clus.EndpointsGRPC()

requestVolume, successfulRequestCount := 0, 0
start := time.Now()
g := new(errgroup.Group)
g.Go(func() (lastErr error) {
clusterClient, cerr := clientv3.New(clientv3.Config{
Expand Down Expand Up @@ -143,15 +144,17 @@ func TestFailoverOnDefrag(t *testing.T) {
if err != nil {
t.Logf("etcd client failed to fail over, error (%v)", err)
}
t.Logf("request failure rate is %.2f%%, traffic volume successfulRequestCount %d requests, total %d requests", (1-float64(successfulRequestCount)/float64(requestVolume))*100, successfulRequestCount, requestVolume)

require.GreaterOrEqual(t, requestVolume, tc.expectedMinTotalRequestsCount)
failedRequestCount := requestVolume - successfulRequestCount
if tc.expectedMaxFailedRequestsCount != 0 {
require.LessOrEqual(t, failedRequestCount, tc.expectedMaxFailedRequestsCount)
qps := float64(requestVolume) / float64(time.Since(start)) * float64(time.Second)
failureRate := 1 - float64(successfulRequestCount)/float64(requestVolume)
t.Logf("request failure rate is %.2f%%, qps is %.2f requests/second", failureRate*100, qps)

require.GreaterOrEqual(t, qps, tc.expectedMinQPS)
if tc.expectedMaxFailureRate != 0.0 {
require.LessOrEqual(t, failureRate, tc.expectedMaxFailureRate)
}
if tc.expectedMinFailedRequestsCount != 0 {
require.GreaterOrEqual(t, failedRequestCount, tc.expectedMinFailedRequestsCount)
if tc.expectedMinFailureRate != 0.0 {
require.GreaterOrEqual(t, failureRate, tc.expectedMinFailureRate)
}
})
}
Expand Down

0 comments on commit a53af18

Please sign in to comment.