Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC health server sets serving status to NOT_SERVING on defrag #16278

Merged
merged 3 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG/CHANGELOG-3.6.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
- Add [`etcd --experimental-snapshot-catch-up-entries`](https://github.com/etcd-io/etcd/pull/15033) flag to configure number of entries for a slow follower to catch up after compacting the raft storage entries and defaults to 5k.
- Decreased [`--snapshot-count` default value from 100,000 to 10,000](https://github.com/etcd-io/etcd/pull/15408)
- Add [`etcd --tls-min-version --tls-max-version`](https://github.com/etcd-io/etcd/pull/15156) to enable support for TLS 1.3.
- Add [`etcd --experimental-stop-grpc-service-on-defrag`](https://github.com/etcd-io/etcd/pull/16278) to enable client failover on defrag.

### etcd grpc-proxy

Expand Down
3 changes: 3 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ type ServerConfig struct {
// a shared buffer in its readonly check operations.
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`

// ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation.
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`

// ExperimentalBootstrapDefragThresholdMegabytes is the minimum number of megabytes needed to be freed for etcd server to
// consider running defrag during bootstrap. Needs to be set to non-zero value to take effect.
ExperimentalBootstrapDefragThresholdMegabytes uint `json:"experimental-bootstrap-defrag-threshold-megabytes"`
Expand Down
5 changes: 5 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,9 @@ type Config struct {
// ExperimentalTxnModeWriteWithSharedBuffer enables write transaction to use a shared buffer in its readonly check operations.
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`

// ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation.
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`

// V2Deprecation describes phase of API & Storage V2 support
V2Deprecation config.V2DeprecationEnum `json:"v2-deprecation"`
}
Expand Down Expand Up @@ -529,6 +532,7 @@ func NewConfig() *Config {
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
ExperimentalMemoryMlock: false,
ExperimentalTxnModeWriteWithSharedBuffer: true,
ExperimentalStopGRPCServiceOnDefrag: false,
ExperimentalMaxLearners: membership.DefaultMaxLearners,

ExperimentalCompactHashCheckEnabled: false,
Expand Down Expand Up @@ -725,6 +729,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {
fs.DurationVar(&cfg.ExperimentalWarningUnaryRequestDuration, "experimental-warning-unary-request-duration", cfg.ExperimentalWarningUnaryRequestDuration, "Time duration after which a warning is generated if a unary request takes more time. It's deprecated, and will be decommissioned in v3.7. Use --warning-unary-request-duration instead.")
fs.BoolVar(&cfg.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.")
fs.BoolVar(&cfg.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")
fs.BoolVar(&cfg.ExperimentalStopGRPCServiceOnDefrag, "experimental-stop-grpc-service-on-defrag", cfg.ExperimentalStopGRPCServiceOnDefrag, "Enable etcd gRPC service to stop serving client requests on defragmentation.")
fs.UintVar(&cfg.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
fs.IntVar(&cfg.ExperimentalMaxLearners, "experimental-max-learners", membership.DefaultMaxLearners, "Sets the maximum number of learners that can be available in the cluster membership.")
fs.Uint64Var(&cfg.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.SnapshotCatchUpEntries, "Number of entries for a slow follower to catch up after compacting the raft storage entries.")
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
WarningUnaryRequestDuration: cfg.WarningUnaryRequestDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
ExperimentalStopGRPCServiceOnDefrag: cfg.ExperimentalStopGRPCServiceOnDefrag,
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
ExperimentalMaxLearners: cfg.ExperimentalMaxLearners,
V2Deprecation: cfg.V2DeprecationEffective(),
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ Experimental feature:
Enable to enforce etcd pages (in particular bbolt) to stay in RAM.
--experimental-snapshot-catchup-entries
Number of entries for a slow follower to catch up after compacting the raft storage entries.
--experimental-stop-grpc-service-on-defrag
Enable etcd gRPC service to stop serving client requests on defragmentation.

Unsafe feature:
--force-new-cluster 'false'
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3client/v3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}

mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s, nil))
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)

clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
Expand Down
6 changes: 1 addition & 5 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))

// server should register all the services manually
// use empty service name for all etcd services' health status,
// see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, NewHealthNotifier(hsrv, s.Logger())))
healthpb.RegisterHealthServer(grpcServer, hsrv)

// set zero values for metrics registered for this grpc server
Expand Down
68 changes: 68 additions & 0 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3rpc

import (
"go.uber.org/zap"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

const (
allGRPCServices = ""
)

type HealthNotifier interface {
chaochn47 marked this conversation as resolved.
Show resolved Hide resolved
StartServe()
StopServe(reason string)
}

func NewHealthNotifier(hs *health.Server, lg *zap.Logger) HealthNotifier {
if hs == nil {
panic("unexpected nil gRPC health server")
}
if lg == nil {
lg = zap.NewNop()
}
hc := &healthChecker{hs: hs, lg: lg}
chaochn47 marked this conversation as resolved.
Show resolved Hide resolved
// set grpc health server as serving status blindly since
// the grpc server will serve iff s.ReadyNotify() is closed.
hc.StartServe()
chaochn47 marked this conversation as resolved.
Show resolved Hide resolved
return hc
}

type healthChecker struct {
hs *health.Server
lg *zap.Logger
}

func (hc *healthChecker) StartServe() {
hc.lg.Info(
"grpc service status changed",
zap.String("service", allGRPCServices),
zap.String("status", healthpb.HealthCheckResponse_SERVING.String()),
)
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_SERVING)
}

func (hc *healthChecker) StopServe(reason string) {
hc.lg.Warn(
"grpc service status changed",
zap.String("service", allGRPCServices),
zap.String("status", healthpb.HealthCheckResponse_NOT_SERVING.String()),
zap.String("reason", reason),
)
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_NOT_SERVING)
}
11 changes: 9 additions & 2 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ type maintenanceServer struct {
cs ClusterStatusGetter
d Downgrader
vs serverversion.Server
hn HealthNotifier

stopServingOnDefrag bool
}

func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), hn: hn, stopServingOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
Expand All @@ -86,6 +89,10 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
if ms.stopServingOnDefrag {
ms.hn.StopServe("defrag is active")
defer ms.hn.StartServe()
}
err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand Down
163 changes: 163 additions & 0 deletions tests/e2e/failover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy

package e2e

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
_ "google.golang.org/grpc/health"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

const (
// in sync with how kubernetes uses etcd
// https://github.com/kubernetes/kubernetes/blob/release-1.28/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L59-L71
keepaliveTime = 30 * time.Second
keepaliveTimeout = 10 * time.Second
dialTimeout = 20 * time.Second

clientRuntime = 10 * time.Second
requestTimeout = 100 * time.Millisecond
)

func TestFailoverOnDefrag(t *testing.T) {
tcs := []struct {
name string
clusterOptions []e2e.EPClusterOption
gRPCDialOptions []grpc.DialOption

// common assertion
expectedMinTotalRequestsCount int
// happy case assertion
expectedMaxFailedRequestsCount int
// negative case assertion
expectedMinFailedRequestsCount int
chaochn47 marked this conversation as resolved.
Show resolved Hide resolved
}{
{
chaochn47 marked this conversation as resolved.
Show resolved Hide resolved
name: "defrag failover happy case",
clusterOptions: []e2e.EPClusterOption{
e2e.WithClusterSize(3),
e2e.WithExperimentalStopGRPCServiceOnDefrag(true),
e2e.WithGoFailEnabled(true),
},
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinTotalRequestsCount: 300,
expectedMaxFailedRequestsCount: 5,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false",
clusterOptions: []e2e.EPClusterOption{
e2e.WithClusterSize(3),
e2e.WithExperimentalStopGRPCServiceOnDefrag(false),
e2e.WithGoFailEnabled(true),
},
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinTotalRequestsCount: 300,
expectedMinFailedRequestsCount: 90,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled",
clusterOptions: []e2e.EPClusterOption{
e2e.WithClusterSize(3),
e2e.WithExperimentalStopGRPCServiceOnDefrag(true),
e2e.WithGoFailEnabled(true),
},
expectedMinTotalRequestsCount: 300,
expectedMinFailedRequestsCount: 90,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
e2e.BeforeTest(t)
clus, cerr := e2e.NewEtcdProcessCluster(context.TODO(), t, tc.clusterOptions...)
require.NoError(t, cerr)
t.Cleanup(func() { clus.Stop() })

endpoints := clus.EndpointsGRPC()

requestVolume, successfulRequestCount := 0, 0
g := new(errgroup.Group)
g.Go(func() (lastErr error) {
clusterClient, cerr := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
Endpoints: endpoints,
DialOptions: tc.gRPCDialOptions,
})
if cerr != nil {
return cerr
}
defer clusterClient.Close()

timeout := time.After(clientRuntime)
for {
select {
case <-timeout:
return lastErr
default:
}
getContext, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err := clusterClient.Get(getContext, "health")
cancel()
requestVolume++
if err != nil {
lastErr = err
continue
}
chaochn47 marked this conversation as resolved.
Show resolved Hide resolved
successfulRequestCount++
}
})
triggerDefrag(t, clus.Procs[0])

err := g.Wait()
if err != nil {
t.Logf("etcd client failed to fail over, error (%v)", err)
}
t.Logf("request failure rate is %.2f%%, traffic volume successfulRequestCount %d requests, total %d requests", (1-float64(successfulRequestCount)/float64(requestVolume))*100, successfulRequestCount, requestVolume)

require.GreaterOrEqual(t, requestVolume, tc.expectedMinTotalRequestsCount)
failedRequestCount := requestVolume - successfulRequestCount
if tc.expectedMaxFailedRequestsCount != 0 {
require.LessOrEqual(t, failedRequestCount, tc.expectedMaxFailedRequestsCount)
}
if tc.expectedMinFailedRequestsCount != 0 {
require.GreaterOrEqual(t, failedRequestCount, tc.expectedMinFailedRequestsCount)
}
})
}
}

func triggerDefrag(t *testing.T, member e2e.EtcdProcess) {
require.NoError(t, member.Failpoints().SetupHTTP(context.Background(), "defragBeforeCopy", `sleep("10s")`))
require.NoError(t, member.Etcdctl().Defragment(context.Background(), config.DefragOption{Timeout: time.Minute}))
}
6 changes: 6 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ func WithExperimentalWarningUnaryRequestDuration(time time.Duration) EPClusterOp
return func(c *EtcdProcessClusterConfig) { c.ServerConfig.ExperimentalWarningUnaryRequestDuration = time }
}

func WithExperimentalStopGRPCServiceOnDefrag(stopGRPCServiceOnDefrag bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) {
c.ServerConfig.ExperimentalStopGRPCServiceOnDefrag = stopGRPCServiceOnDefrag
}
}

func WithCompactionBatchLimit(limit int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.ServerConfig.ExperimentalCompactionBatchLimit = limit }
}
Expand Down
Loading
Loading