Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3rpc: online defrag toggles gRPC health server serving status #16836

Merged
merged 1 commit into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))

hsrv := health.NewServer()
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, NewHealthNotifier(hsrv, s.Logger())))
healthNotifier := newHealthNotifier(hsrv, s)
healthpb.RegisterHealthServer(grpcServer, hsrv)
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, healthNotifier))

// set zero values for metrics registered for this grpc server
grpc_prometheus.Register(grpcServer)
Expand Down
34 changes: 22 additions & 12 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,47 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.etcd.io/etcd/server/v3/etcdserver"
)

const (
allGRPCServices = ""
)

type HealthNotifier interface {
StartServe()
StopServe(reason string)
type notifier interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The previous name HealthNotifier makes more sense than notifier, because it isn't defined inside package "health".

Overall this PR is a little over-engineering. Both (*maintenanceServer) Defragment and (*healthChecker) StopServe are in the same package api/v3rpc, we don't have to necessarily introduce or refine the interface for now.

Anyway, not a big deal for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other side, we should avoid the other extreme. Fact that structs are in single package doesn't mean that they should access all internal methods. Having a internal interface doesn't complicate code too much, but it clarifies the interned usage.

defragStarted()
defragFinished()
}

func NewHealthNotifier(hs *health.Server, lg *zap.Logger) HealthNotifier {
func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier {
if hs == nil {
panic("unexpected nil gRPC health server")
}
if lg == nil {
lg = zap.NewNop()
}
hc := &healthChecker{hs: hs, lg: lg}
hc := &healthNotifier{hs: hs, lg: s.Logger(), stopGRPCServiceOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag}
// set grpc health server as serving status blindly since
// the grpc server will serve iff s.ReadyNotify() is closed.
hc.StartServe()
hc.startServe()
return hc
}

type healthChecker struct {
type healthNotifier struct {
hs *health.Server
lg *zap.Logger

stopGRPCServiceOnDefrag bool
}

func (hc *healthChecker) StartServe() {
func (hc *healthNotifier) defragStarted() {
if !hc.stopGRPCServiceOnDefrag {
return
}
hc.stopServe("defrag is active")
}

func (hc *healthNotifier) defragFinished() { hc.startServe() }

func (hc *healthNotifier) startServe() {
hc.lg.Info(
"grpc service status changed",
zap.String("service", allGRPCServices),
Expand All @@ -57,7 +67,7 @@ func (hc *healthChecker) StartServe() {
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_SERVING)
}

func (hc *healthChecker) StopServe(reason string) {
func (hc *healthNotifier) stopServe(reason string) {
hc.lg.Warn(
"grpc service status changed",
zap.String("service", allGRPCServices),
Expand Down
13 changes: 5 additions & 8 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,12 @@ type maintenanceServer struct {
cs ClusterStatusGetter
d Downgrader
vs serverversion.Server
hn HealthNotifier

stopServingOnDefrag bool
healthNotifier notifier
}

func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), hn: hn, stopServingOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag}
func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier notifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), healthNotifier: healthNotifier}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
Expand All @@ -89,10 +88,8 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.Mainte

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
if ms.stopServingOnDefrag {
ms.hn.StopServe("defrag is active")
defer ms.hn.StartServe()
}
ms.healthNotifier.defragStarted()
defer ms.healthNotifier.defragFinished()
err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand Down
35 changes: 19 additions & 16 deletions tests/e2e/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func TestFailoverOnDefrag(t *testing.T) {
gRPCDialOptions []grpc.DialOption

// common assertion
expectedMinTotalRequestsCount int
expectedMinQPS float64
// happy case assertion
expectedMaxFailedRequestsCount int
expectedMaxFailureRate float64
// negative case assertion
expectedMinFailedRequestsCount int
expectedMinFailureRate float64
}{
{
name: "defrag failover happy case",
Expand All @@ -66,8 +66,8 @@ func TestFailoverOnDefrag(t *testing.T) {
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinTotalRequestsCount: 300,
expectedMaxFailedRequestsCount: 5,
expectedMinQPS: 20,
expectedMaxFailureRate: 0.01,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false",
Expand All @@ -80,8 +80,8 @@ func TestFailoverOnDefrag(t *testing.T) {
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinTotalRequestsCount: 300,
expectedMinFailedRequestsCount: 90,
expectedMinQPS: 20,
expectedMinFailureRate: 0.25,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled",
Expand All @@ -90,8 +90,8 @@ func TestFailoverOnDefrag(t *testing.T) {
e2e.WithExperimentalStopGRPCServiceOnDefrag(true),
e2e.WithGoFailEnabled(true),
},
expectedMinTotalRequestsCount: 300,
expectedMinFailedRequestsCount: 90,
expectedMinQPS: 20,
expectedMinFailureRate: 0.25,
},
}

Expand All @@ -105,6 +105,7 @@ func TestFailoverOnDefrag(t *testing.T) {
endpoints := clus.EndpointsGRPC()

requestVolume, successfulRequestCount := 0, 0
start := time.Now()
g := new(errgroup.Group)
g.Go(func() (lastErr error) {
clusterClient, cerr := clientv3.New(clientv3.Config{
Expand Down Expand Up @@ -143,15 +144,17 @@ func TestFailoverOnDefrag(t *testing.T) {
if err != nil {
t.Logf("etcd client failed to fail over, error (%v)", err)
}
t.Logf("request failure rate is %.2f%%, traffic volume successfulRequestCount %d requests, total %d requests", (1-float64(successfulRequestCount)/float64(requestVolume))*100, successfulRequestCount, requestVolume)

require.GreaterOrEqual(t, requestVolume, tc.expectedMinTotalRequestsCount)
failedRequestCount := requestVolume - successfulRequestCount
if tc.expectedMaxFailedRequestsCount != 0 {
require.LessOrEqual(t, failedRequestCount, tc.expectedMaxFailedRequestsCount)
qps := float64(requestVolume) / float64(time.Since(start)) * float64(time.Second)
failureRate := 1 - float64(successfulRequestCount)/float64(requestVolume)
t.Logf("request failure rate is %.2f%%, qps is %.2f requests/second", failureRate*100, qps)

require.GreaterOrEqual(t, qps, tc.expectedMinQPS)
if tc.expectedMaxFailureRate != 0.0 {
require.LessOrEqual(t, failureRate, tc.expectedMaxFailureRate)
}
if tc.expectedMinFailedRequestsCount != 0 {
require.GreaterOrEqual(t, failedRequestCount, tc.expectedMinFailedRequestsCount)
if tc.expectedMinFailureRate != 0.0 {
require.GreaterOrEqual(t, failureRate, tc.expectedMinFailureRate)
}
})
}
Expand Down
Loading