From a53af1859f4c15c45fa0c5fac2ccee76a300624d Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Thu, 26 Oct 2023 13:11:45 -0700 Subject: [PATCH] v3rpc: run health notifier to listen on online defrag state change Signed-off-by: Chao Chen --- server/etcdserver/api/v3client/v3client.go | 2 +- server/etcdserver/api/v3rpc/grpc.go | 4 +- server/etcdserver/api/v3rpc/health.go | 49 ++++++++++++++++------ server/etcdserver/api/v3rpc/maintenance.go | 26 ++++++++---- tests/e2e/failover_test.go | 35 +++++++++------- 5 files changed, 77 insertions(+), 39 deletions(-) diff --git a/server/etcdserver/api/v3client/v3client.go b/server/etcdserver/api/v3client/v3client.go index b9d18399f2d0..c44479ffad24 100644 --- a/server/etcdserver/api/v3client/v3client.go +++ b/server/etcdserver/api/v3client/v3client.go @@ -39,7 +39,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client { wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s)) c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)} - mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s, nil)) + mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s)) c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c) clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s)) diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index 3b0bdd8c825d..6053aa9ba100 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -75,9 +75,11 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s)) pb.RegisterClusterServer(grpcServer, NewClusterServer(s)) pb.RegisterAuthServer(grpcServer, NewAuthServer(s)) + maintenanceServer := NewMaintenanceServer(s) + pb.RegisterMaintenanceServer(grpcServer, maintenanceServer) hsrv := health.NewServer() - pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, NewHealthNotifier(hsrv, s.Logger()))) + NewHealthNotifier(hsrv, s, maintenanceServer.DefragNotify()) healthpb.RegisterHealthServer(grpcServer, hsrv) // set zero values for metrics registered for this grpc server diff --git a/server/etcdserver/api/v3rpc/health.go b/server/etcdserver/api/v3rpc/health.go index 5e84b9b40149..71bf22632c07 100644 --- a/server/etcdserver/api/v3rpc/health.go +++ b/server/etcdserver/api/v3rpc/health.go @@ -18,37 +18,57 @@ import ( "go.uber.org/zap" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" + + "go.etcd.io/etcd/server/v3/etcdserver" ) const ( allGRPCServices = "" ) -type HealthNotifier interface { - StartServe() - StopServe(reason string) +type etcdServerHealth interface { + StopNotify() <-chan struct{} } -func NewHealthNotifier(hs *health.Server, lg *zap.Logger) HealthNotifier { +func NewHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer, defragNotifier <-chan defragStatus) { if hs == nil { panic("unexpected nil gRPC health server") } - if lg == nil { - lg = zap.NewNop() - } - hc := &healthChecker{hs: hs, lg: lg} + hc := &healthNotifier{hs: hs, s: s, lg: s.Logger(), defragNotifier: defragNotifier, stopGRPCServiceOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag} // set grpc health server as serving status blindly since // the grpc server will serve iff s.ReadyNotify() is closed. - hc.StartServe() - return hc + hc.startServe() + go hc.watchDefragStatus() } -type healthChecker struct { +type healthNotifier struct { hs *health.Server + s etcdServerHealth lg *zap.Logger + + defragNotifier <-chan defragStatus + stopGRPCServiceOnDefrag bool } -func (hc *healthChecker) StartServe() { +func (hc *healthNotifier) watchDefragStatus() { + for { + select { + case <-hc.s.StopNotify(): + return + case status := <-hc.defragNotifier: + switch status { + case defragStarted: + hc.stopServe("defrag is active") + case defragFinished: + hc.startServe() + default: + panic("unsupported defrag status") + } + } + } +} + +func (hc *healthNotifier) startServe() { hc.lg.Info( "grpc service status changed", zap.String("service", allGRPCServices), @@ -57,7 +77,10 @@ func (hc *healthChecker) StartServe() { hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_SERVING) } -func (hc *healthChecker) StopServe(reason string) { +func (hc *healthNotifier) stopServe(reason string) { + if !hc.stopGRPCServiceOnDefrag { + return + } hc.lg.Warn( "grpc service status changed", zap.String("service", allGRPCServices), diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index b722c44ce911..46d194735f98 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -63,6 +63,13 @@ type ClusterStatusGetter interface { IsLearner() bool } +type defragStatus int + +const ( + defragStarted defragStatus = iota + defragFinished +) + type maintenanceServer struct { lg *zap.Logger rg apply.RaftStatusGetter @@ -74,13 +81,12 @@ type maintenanceServer struct { cs ClusterStatusGetter d Downgrader vs serverversion.Server - hn HealthNotifier - stopServingOnDefrag bool + defragC chan defragStatus } -func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.MaintenanceServer { - srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), hn: hn, stopServingOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag} +func NewMaintenanceServer(s *etcdserver.EtcdServer) *authMaintenanceServer { + srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), defragC: make(chan defragStatus)} if srv.lg == nil { srv.lg = zap.NewNop() } @@ -89,10 +95,10 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.Mainte func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) { ms.lg.Info("starting defragment") - if ms.stopServingOnDefrag { - ms.hn.StopServe("defrag is active") - defer ms.hn.StartServe() - } + ms.defragC <- defragStarted + defer func() { + ms.defragC <- defragFinished + }() err := ms.bg.Backend().Defrag() if err != nil { ms.lg.Warn("failed to defragment", zap.Error(err)) @@ -102,6 +108,10 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe return &pb.DefragmentResponse{}, nil } +func (ms *maintenanceServer) DefragNotify() <-chan defragStatus { + return ms.defragC +} + // big enough size to hold >1 OS pages in the buffer const snapshotSendBufferSize = 32 * 1024 diff --git a/tests/e2e/failover_test.go b/tests/e2e/failover_test.go index b32f8f42d11c..aec467fcc9cc 100644 --- a/tests/e2e/failover_test.go +++ b/tests/e2e/failover_test.go @@ -49,11 +49,11 @@ func TestFailoverOnDefrag(t *testing.T) { gRPCDialOptions []grpc.DialOption // common assertion - expectedMinTotalRequestsCount int + expectedMinQPS float64 // happy case assertion - expectedMaxFailedRequestsCount int + expectedMaxFailureRate float64 // negative case assertion - expectedMinFailedRequestsCount int + expectedMinFailureRate float64 }{ { name: "defrag failover happy case", @@ -66,8 +66,8 @@ func TestFailoverOnDefrag(t *testing.T) { grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), }, - expectedMinTotalRequestsCount: 300, - expectedMaxFailedRequestsCount: 5, + expectedMinQPS: 20, + expectedMaxFailureRate: 0.01, }, { name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false", @@ -80,8 +80,8 @@ func TestFailoverOnDefrag(t *testing.T) { grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), }, - expectedMinTotalRequestsCount: 300, - expectedMinFailedRequestsCount: 90, + expectedMinQPS: 20, + expectedMinFailureRate: 0.25, }, { name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled", @@ -90,8 +90,8 @@ func TestFailoverOnDefrag(t *testing.T) { e2e.WithExperimentalStopGRPCServiceOnDefrag(true), e2e.WithGoFailEnabled(true), }, - expectedMinTotalRequestsCount: 300, - expectedMinFailedRequestsCount: 90, + expectedMinQPS: 20, + expectedMinFailureRate: 0.25, }, } @@ -105,6 +105,7 @@ func TestFailoverOnDefrag(t *testing.T) { endpoints := clus.EndpointsGRPC() requestVolume, successfulRequestCount := 0, 0 + start := time.Now() g := new(errgroup.Group) g.Go(func() (lastErr error) { clusterClient, cerr := clientv3.New(clientv3.Config{ @@ -143,15 +144,17 @@ func TestFailoverOnDefrag(t *testing.T) { if err != nil { t.Logf("etcd client failed to fail over, error (%v)", err) } - t.Logf("request failure rate is %.2f%%, traffic volume successfulRequestCount %d requests, total %d requests", (1-float64(successfulRequestCount)/float64(requestVolume))*100, successfulRequestCount, requestVolume) - require.GreaterOrEqual(t, requestVolume, tc.expectedMinTotalRequestsCount) - failedRequestCount := requestVolume - successfulRequestCount - if tc.expectedMaxFailedRequestsCount != 0 { - require.LessOrEqual(t, failedRequestCount, tc.expectedMaxFailedRequestsCount) + qps := float64(requestVolume) / float64(time.Since(start)) * float64(time.Second) + failureRate := 1 - float64(successfulRequestCount)/float64(requestVolume) + t.Logf("request failure rate is %.2f%%, qps is %.2f requests/second", failureRate*100, qps) + + require.GreaterOrEqual(t, qps, tc.expectedMinQPS) + if tc.expectedMaxFailureRate != 0.0 { + require.LessOrEqual(t, failureRate, tc.expectedMaxFailureRate) } - if tc.expectedMinFailedRequestsCount != 0 { - require.GreaterOrEqual(t, failedRequestCount, tc.expectedMinFailedRequestsCount) + if tc.expectedMinFailureRate != 0.0 { + require.GreaterOrEqual(t, failureRate, tc.expectedMinFailureRate) } }) }