diff --git a/client/v3/client.go b/client/v3/client.go index 0c91889fa336..e0d94dcc2a43 100644 --- a/client/v3/client.go +++ b/client/v3/client.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/codes" grpccredentials "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + _ "google.golang.org/grpc/health" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" @@ -256,7 +257,11 @@ func (c *Client) Dial(ep string) (*grpc.ClientConn, error) { // Using ad-hoc created resolver, to guarantee only explicitly given // endpoint is used. - return c.dial(creds, grpc.WithResolvers(resolver.New(ep))) + serviceConfigJSON := `{"loadBalancingPolicy": "round_robin"}` + if c.cfg.EnableGRPCHealthCheck { + serviceConfigJSON = `{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}` + } + return c.dial(creds, grpc.WithResolvers(resolver.New(serviceConfigJSON, ep))) } func (c *Client) getToken(ctx context.Context) error { @@ -408,7 +413,11 @@ func newClient(cfg *Config) (*Client, error) { client.callOpts = callOpts } - client.resolver = resolver.New(cfg.Endpoints...) + serviceConfigJSON := `{"loadBalancingPolicy": "round_robin"}` + if cfg.EnableGRPCHealthCheck { + serviceConfigJSON = `{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}` + } + client.resolver = resolver.New(serviceConfigJSON, cfg.Endpoints...) if len(cfg.Endpoints) < 1 { client.cancel() diff --git a/client/v3/config.go b/client/v3/config.go index 4a26714a8645..38daad547192 100644 --- a/client/v3/config.go +++ b/client/v3/config.go @@ -69,6 +69,8 @@ type Config struct { // RejectOldCluster when set will refuse to create a client against an outdated cluster. RejectOldCluster bool `json:"reject-old-cluster"` + EnableGRPCHealthCheck bool `json:"enable-grpc-health-check"` + // DialOptions is a list of dial options for the grpc client (e.g., for interceptors). // For example, pass "grpc.WithBlock()" to block until the underlying connection is up. // Without this, Dial returns immediately and connecting the server happens in background. diff --git a/client/v3/internal/resolver/resolver.go b/client/v3/internal/resolver/resolver.go index b5c9de00786e..0587a7153445 100644 --- a/client/v3/internal/resolver/resolver.go +++ b/client/v3/internal/resolver/resolver.go @@ -30,18 +30,19 @@ const ( // using SetEndpoints. type EtcdManualResolver struct { *manual.Resolver - endpoints []string - serviceConfig *serviceconfig.ParseResult + endpoints []string + serviceConfigJSON string + serviceConfig *serviceconfig.ParseResult } -func New(endpoints ...string) *EtcdManualResolver { +func New(serviceConfigJSON string, endpoints ...string) *EtcdManualResolver { r := manual.NewBuilderWithScheme(Schema) - return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil} + return &EtcdManualResolver{Resolver: r, serviceConfigJSON: serviceConfigJSON, endpoints: endpoints, serviceConfig: nil} } // Build returns itself for Resolver, because it's both a builder and a resolver. func (r *EtcdManualResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - r.serviceConfig = cc.ParseServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) + r.serviceConfig = cc.ParseServiceConfig(r.serviceConfigJSON) if r.serviceConfig.Err != nil { return nil, r.serviceConfig.Err } diff --git a/server/etcdserver/api/v3client/v3client.go b/server/etcdserver/api/v3client/v3client.go index c44479ffad24..b9d18399f2d0 100644 --- a/server/etcdserver/api/v3client/v3client.go +++ b/server/etcdserver/api/v3client/v3client.go @@ -39,7 +39,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client { wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s)) c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)} - mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s)) + mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s, nil)) c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c) clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s)) diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index 349ebea40074..38d95dd729c7 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -76,7 +76,6 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s)) pb.RegisterClusterServer(grpcServer, NewClusterServer(s)) pb.RegisterAuthServer(grpcServer, NewAuthServer(s)) - pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s)) // server should register all the services manually // use empty service name for all etcd services' health status, @@ -85,6 +84,8 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) healthpb.RegisterHealthServer(grpcServer, hsrv) + pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, hsrv)) + // set zero values for metrics registered for this grpc server grpc_prometheus.Register(grpcServer) diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index 9920a9cbd6b8..7b69f33bdf10 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -21,6 +21,10 @@ import ( "time" "github.com/dustin/go-humanize" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + + "go.etcd.io/raft/v3" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" @@ -32,7 +36,6 @@ import ( "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/schema" - "go.etcd.io/raft/v3" "go.uber.org/zap" ) @@ -75,10 +78,11 @@ type maintenanceServer struct { cs ClusterStatusGetter d Downgrader vs serverversion.Server + hs *health.Server } -func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer { - srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)} +func NewMaintenanceServer(s *etcdserver.EtcdServer, hs *health.Server) pb.MaintenanceServer { + srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), hs: hs} if srv.lg == nil { srv.lg = zap.NewNop() } @@ -87,6 +91,11 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer { func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) { ms.lg.Info("starting defragment") + if ms.hs != nil { + ms.hs.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING) + defer ms.hs.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) + } + err := ms.bg.Backend().Defrag() if err != nil { ms.lg.Warn("failed to defragment", zap.Error(err)) diff --git a/tests/integration/v3_failover_test.go b/tests/integration/v3_failover_test.go index 9d271bd9fa90..8bab82a3ec44 100644 --- a/tests/integration/v3_failover_test.go +++ b/tests/integration/v3_failover_test.go @@ -18,9 +18,11 @@ import ( "bytes" "context" "crypto/tls" + "strconv" "testing" "time" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" @@ -29,6 +31,80 @@ import ( clientv3test "go.etcd.io/etcd/tests/v3/integration/clientv3" ) +func TestFailoverOnDefrag(t *testing.T) { + integration2.BeforeTest(t, integration2.WithFailpoint("defragBeforeCopy", `sleep(10000)`)) + clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + endpoints := clus.Endpoints() + + cnt, success := 0, 0 + donec := make(chan struct{}) + errc := make(chan error, 1) + + go func() { + var lastErr error + var cc *clientv3.Client + defer func() { + if cc != nil { + cc.Close() + } + errc <- lastErr + close(donec) + close(errc) + }() + cc, cerr := clientv3.New(clientv3.Config{ + DialTimeout: 200 * time.Millisecond, + DialKeepAliveTime: 1 * time.Second, + DialKeepAliveTimeout: 200 * time.Millisecond, + Endpoints: endpoints, + EnableGRPCHealthCheck: true, + }) + require.NoError(t, cerr) + timeout := time.After(10 * time.Second) + for { + select { + case <-timeout: + return + default: + } + + // only simulate traffic when defrag is active. + m, err := clus.Members[0].Metric("etcd_disk_defrag_inflight") + if err != nil { + continue + } + defragActive, err := strconv.Atoi(m) + if err != nil { + panic(err) + } + if defragActive == 0 { + continue + } + + cctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + _, err = cc.Get(cctx, "health") + cancel() + cnt++ + if err != nil { + lastErr = err + continue + } + success++ + } + }() + // wait for the grpc connection is established + time.Sleep(500 * time.Millisecond) + _, err := clus.Client(0).Defragment(context.Background(), endpoints[0]) + require.NoError(t, err) + + <-donec + err, ok := <-errc + if ok && err != nil { + t.Errorf("etcd client failed to fail over, error (%v)", err) + } + t.Logf("request failure rate is %.2f%%, traffic volume success %d requests, total %d requests", (1-float64(success)/float64(cnt))*100, success, cnt) +} + func TestFailover(t *testing.T) { cases := []struct { name string