From 8529110921c571c00c5e1783db6fa51d81c2d369 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Thu, 16 Nov 2023 14:14:30 -0800 Subject: [PATCH] etcdserver: move rpc defrag notifier into backend. Signed-off-by: Siyuan Zhang --- server/etcdserver/api/v3rpc/health.go | 12 +++----- server/etcdserver/api/v3rpc/maintenance.go | 9 ++---- server/storage/backend/backend.go | 34 ++++++++++++++++++++++ server/storage/mvcc/kvstore_test.go | 1 + 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/server/etcdserver/api/v3rpc/health.go b/server/etcdserver/api/v3rpc/health.go index e87140d17432..c6a889859681 100644 --- a/server/etcdserver/api/v3rpc/health.go +++ b/server/etcdserver/api/v3rpc/health.go @@ -20,18 +20,14 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/storage/backend" ) const ( allGRPCServices = "" ) -type notifier interface { - defragStarted() - defragFinished() -} - -func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier { +func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) backend.DefragNotifier { if hs == nil { panic("unexpected nil gRPC health server") } @@ -49,14 +45,14 @@ type healthNotifier struct { stopGRPCServiceOnDefrag bool } -func (hc *healthNotifier) defragStarted() { +func (hc *healthNotifier) DefragStarted() { if !hc.stopGRPCServiceOnDefrag { return } hc.stopServe("defrag is active") } -func (hc *healthNotifier) defragFinished() { hc.startServe() } +func (hc *healthNotifier) DefragFinished() { hc.startServe() } func (hc *healthNotifier) startServe() { hc.lg.Info( diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index 84506bd6b3c1..e4f5f468d2fe 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -74,22 +74,19 @@ type maintenanceServer struct { cs ClusterStatusGetter d Downgrader vs serverversion.Server - - healthNotifier notifier } -func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier notifier) pb.MaintenanceServer { - srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), healthNotifier: healthNotifier} +func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier backend.DefragNotifier) pb.MaintenanceServer { + srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)} if srv.lg == nil { srv.lg = zap.NewNop() } + s.Backend().SubscribeDefragNotifier(healthNotifier) return &authMaintenanceServer{srv, &AuthAdmin{s}} } func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) { ms.lg.Info("starting defragment") - ms.healthNotifier.defragStarted() - defer ms.healthNotifier.defragFinished() err := ms.bg.Backend().Defrag() if err != nil { ms.lg.Warn("failed to defragment", zap.Error(err)) diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index e7b951ee7e6b..ce204f2ae182 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -71,6 +71,8 @@ type Backend interface { // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook. SetTxPostLockInsideApplyHook(func()) + + SubscribeDefragNotifier(notifier DefragNotifier) } type Snapshot interface { @@ -82,6 +84,11 @@ type Snapshot interface { Close() error } +type DefragNotifier interface { + DefragStarted() + DefragFinished() +} + type txReadBufferCache struct { mu sync.Mutex buf *txReadBuffer @@ -127,6 +134,9 @@ type backend struct { txPostLockInsideApplyHook func() lg *zap.Logger + + defragNotifiers []DefragNotifier + notifierMu sync.RWMutex } type BackendConfig struct { @@ -445,6 +455,27 @@ func (b *backend) Defrag() error { return b.defrag() } +func (b *backend) SubscribeDefragNotifier(notifier DefragNotifier) { + if notifier == nil { + return + } + b.notifierMu.Lock() + defer b.notifierMu.Unlock() + b.defragNotifiers = append(b.defragNotifiers, notifier) +} + +func (b *backend) defragStarted() { + for _, notifier := range b.defragNotifiers { + notifier.DefragStarted() + } +} + +func (b *backend) defragFinished() { + for _, notifier := range b.defragNotifiers { + notifier.DefragFinished() + } +} + func (b *backend) defrag() error { now := time.Now() isDefragActive.Set(1) @@ -459,6 +490,9 @@ func (b *backend) defrag() error { // lock database after lock tx to avoid deadlock. b.mu.Lock() defer b.mu.Unlock() + // send notifications after acquiring the lock. + b.defragStarted() + defer b.defragFinished() // block concurrent read requests while resetting tx b.readTx.Lock() diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 3e413596cc6b..798c3429c959 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -977,6 +977,7 @@ func (b *fakeBackend) ForceCommit() func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {} +func (b *fakeBackend) SubscribeDefragNotifier(backend.DefragNotifier) {} type indexGetResp struct { rev Revision