Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Main branch - etcd 3.6] poc of client fail over #9

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
run: |
set -euo pipefail
go clean -testcache
make gofail-enable

echo "${TARGET}"
case "${TARGET}" in
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3client/v3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}

mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s, v3rpc.NewHealthChecker(nil, s)))
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)

clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
Expand Down
5 changes: 3 additions & 2 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))

// server should register all the services manually
// use empty service name for all etcd services' health status,
// see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(grpcServer, hsrv)
hc := NewHealthChecker(hsrv, s)

pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, hc))

// set zero values for metrics registered for this grpc server
grpc_prometheus.Register(grpcServer)
Expand Down
155 changes: 155 additions & 0 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3rpc

import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/zap"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.etcd.io/raft/v3"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
)

type healthNotifier interface {
StartServe()
StopServe(reason string)
}

type healthChecker struct {
hs *health.Server

mu sync.RWMutex
served bool

// implements readyz prober
s *etcdserver.EtcdServer
// prober configuration TODO 2023-09-13 externalize as etcd server flag
successThreshold int
failureThreshold int
healthCheckInterval time.Duration
// prober state
successCount int
failureCount int
}

func NewHealthChecker(hs *health.Server, s *etcdserver.EtcdServer) healthNotifier {
hc := &healthChecker{hs: hs, s: s, successThreshold: 2, failureThreshold: 3, healthCheckInterval: 100 * time.Millisecond}
// set grpc health server as serving status blindly since
// the grpc server will serve iff s.ReadyNotify() is closed.
hc.StartServe()

go hc.startProbe()
return hc
}

func (hc *healthChecker) StartServe() {
if hc.hs != nil {
if hc.s.Logger() != nil {
hc.s.Logger().Info("start serving gRPC requests from client")
}
hc.hs.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
hc.changeServedTo(true)
}
}

func (hc *healthChecker) StopServe(reason string) {
if hc.hs != nil {
if hc.s.Logger() != nil {
hc.s.Logger().Warn("stop serving gRPC requests from client", zap.String("reason", reason))
}
hc.hs.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
hc.changeServedTo(false)
}
}

func (hc *healthChecker) changeServedTo(served bool) {
hc.mu.Lock()
defer hc.mu.Unlock()
hc.served = served
}

func (hc *healthChecker) IsServed() bool {
hc.mu.RLock()
defer hc.mu.RUnlock()
return hc.served
}

func (hc *healthChecker) startProbe() {
// stop probing if there is no consumer.
if hc.hs == nil {
return
}

ticker := time.NewTicker(hc.healthCheckInterval)
defer ticker.Stop()
for {
select {
case <-hc.s.StoppingNotify():
return
case <-ticker.C:
if ready, notReadyReason := checkReadyz(hc.s, 2*time.Second); ready {
hc.successCount += 1
hc.failureCount = 0
if hc.successCount >= hc.successThreshold && !hc.IsServed() {
hc.StartServe()
}
} else {
hc.failureCount += 1
hc.successCount = 0
hc.s.Logger().Warn("readyz check failed", zap.Int("failure-count", hc.failureCount))
if hc.failureCount >= hc.failureThreshold && hc.IsServed() {
hc.StopServe(notReadyReason)
}
}
}
}
}

// checkReadyz prober implementation should be in sync with readyz design in the future.
// https://docs.google.com/document/d/1109lUxD326yRwmMVX-tkJMpm8pTbOw7MD1XsSIo37MU
// it is now in sync with etcdhttp.HandleHealth implementation
// 1. checkLeader
// 2. checkAlarms
// 3. checkQuorumRead
func checkReadyz(s *etcdserver.EtcdServer, timeout time.Duration) (ready bool, notReadyReason string) {
if s.Leader() == types.ID(raft.None) {
return false, "local member does not have leader"
}
for _, alarm := range s.Alarms() {
if types.ID(alarm.MemberID) == s.MemberId() && alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
return false, "local member has corrupt alarm"
}
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
// range requests will only be timed-out in the LinearizableReadNotify phase, and
// blocking in the mvcc phase in case of defrag.
// However, use solely LinearizableReadNotify to indicate readyz will produce false positive serving status when defrag is active.
_, err := s.Range(ctx, &etcdserverpb.RangeRequest{KeysOnly: true, Limit: 1})
cancel()
if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied {
return false, fmt.Sprintf("local member quorum read with %s timeout failed due to %v", timeout.String(), err)
}
return true, ""
}
10 changes: 7 additions & 3 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"github.com/dustin/go-humanize"

"go.etcd.io/raft/v3"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/api/v3/version"
Expand All @@ -32,7 +34,6 @@ import (
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/raft/v3"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -75,10 +76,11 @@ type maintenanceServer struct {
cs ClusterStatusGetter
d Downgrader
vs serverversion.Server
hs healthNotifier
}

func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
func NewMaintenanceServer(s *etcdserver.EtcdServer, hs healthNotifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), hs: hs}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
Expand All @@ -87,6 +89,8 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
ms.hs.StopServe("defrag is active")
defer ms.hs.StartServe()
err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand Down
163 changes: 163 additions & 0 deletions tests/e2e/failover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy

package e2e

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
_ "google.golang.org/grpc/health"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

const (
// in sync with how kubernetes uses etcd
// https://github.com/kubernetes/kubernetes/blob/release-1.28/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L59-L71
keepaliveTime = 30 * time.Second
keepaliveTimeout = 10 * time.Second
dialTimeout = 20 * time.Second

clientRuntime = 10 * time.Second
failureDetectionLatency = 6 * time.Second
// expect no more than 5 failed requests
failedRequests = 5
)

func TestFailover(t *testing.T) {
tcs := []struct {
name string
clusterOptions []e2e.EPClusterOption
failureInjector func(t *testing.T, clus *e2e.EtcdProcessCluster)
failureDetectionLatency time.Duration
}{
{
name: "network_partition",
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithIsPeerTLS(true), e2e.WithPeerProxy(true)},
failureInjector: blackhole,
failureDetectionLatency: failureDetectionLatency,
},
{
name: "stalled_disk_write",
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithGoFailEnabled(true)},
failureInjector: blockDiskWrite,
failureDetectionLatency: failureDetectionLatency,
},
{
name: "defrag",
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithGoFailEnabled(true)},
failureInjector: triggerDefrag,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
e2e.BeforeTest(t)
clus, err := e2e.NewEtcdProcessCluster(context.TODO(), t, tc.clusterOptions...)
t.Cleanup(func() { clus.Stop() })
endpoints := clus.EndpointsGRPC()

cnt, success := 0, 0
donec := make(chan struct{})
errc := make(chan error, 1)

go func() {
var lastErr error
var cc *clientv3.Client
defer func() {
if cc != nil {
cc.Close()
}
errc <- lastErr
close(donec)
close(errc)
}()
cc, cerr := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
Endpoints: endpoints,
DialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
// the following service config will disable grpc client health check
//grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
},
})
require.NoError(t, cerr)
timeout := time.After(clientRuntime)

time.Sleep(tc.failureDetectionLatency)
for {
select {
case <-timeout:
return
default:
}
cctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
//start := time.Now()
_, err = cc.Get(cctx, "health")
//t.Logf("TS (%s): number #%d health check took %v", time.Now().UTC().Format(time.RFC3339), cnt, time.Since(start))
cancel()
cnt++
if err != nil {
lastErr = err
continue
}
success++
}
}()

tc.failureInjector(t, clus)

<-donec
err, ok := <-errc
if ok && err != nil {
t.Logf("etcd client failed to fail over, error (%v)", err)
}
t.Logf("request failure rate is %.2f%%, traffic volume success %d requests, total %d requests", (1-float64(success)/float64(cnt))*100, success, cnt)
// expect no more than 5 failed requests
require.InDelta(t, cnt, success, failedRequests)
})
}
}

func blackhole(t *testing.T, clus *e2e.EtcdProcessCluster) {
member := clus.Procs[0]
proxy := member.PeerProxy()
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
}

func blockDiskWrite(t *testing.T, clus *e2e.EtcdProcessCluster) {
err := clus.Procs[0].Failpoints().Setup(context.Background(), "raftBeforeSave", `sleep(10000)`)
require.NoError(t, err)
clus.Procs[0].Etcdctl().Put(context.Background(), "foo", "bar", config.PutOptions{})
}

func triggerDefrag(t *testing.T, clus *e2e.EtcdProcessCluster) {
err := clus.Procs[0].Failpoints().Setup(context.Background(), "defragBeforeCopy", `sleep(8000)`)
require.NoError(t, err)
err = clus.Procs[0].Etcdctl().Defragment(context.Background(), config.DefragOption{Timeout: time.Minute})
require.NoError(t, err)
}
Loading