Skip to content

Commit

Permalink
etcdserver: move rpc defrag notifier into backend.
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <[email protected]>
  • Loading branch information
siyuanfoundation committed Nov 16, 2023
1 parent 1bade2c commit 8529110
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 14 deletions.
12 changes: 4 additions & 8 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage/backend"
)

const (
allGRPCServices = ""
)

type notifier interface {
defragStarted()
defragFinished()
}

func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier {
func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) backend.DefragNotifier {
if hs == nil {
panic("unexpected nil gRPC health server")
}
Expand All @@ -49,14 +45,14 @@ type healthNotifier struct {
stopGRPCServiceOnDefrag bool
}

func (hc *healthNotifier) defragStarted() {
func (hc *healthNotifier) DefragStarted() {
if !hc.stopGRPCServiceOnDefrag {
return
}
hc.stopServe("defrag is active")
}

func (hc *healthNotifier) defragFinished() { hc.startServe() }
func (hc *healthNotifier) DefragFinished() { hc.startServe() }

func (hc *healthNotifier) startServe() {
hc.lg.Info(
Expand Down
9 changes: 3 additions & 6 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,19 @@ type maintenanceServer struct {
cs ClusterStatusGetter
d Downgrader
vs serverversion.Server

healthNotifier notifier
}

func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier notifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), healthNotifier: healthNotifier}
func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier backend.DefragNotifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
s.Backend().SubscribeDefragNotifier(healthNotifier)
return &authMaintenanceServer{srv, &AuthAdmin{s}}
}

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
ms.healthNotifier.defragStarted()
defer ms.healthNotifier.defragFinished()
err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand Down
34 changes: 34 additions & 0 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type Backend interface {

// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
SetTxPostLockInsideApplyHook(func())

SubscribeDefragNotifier(notifier DefragNotifier)
}

type Snapshot interface {
Expand All @@ -82,6 +84,11 @@ type Snapshot interface {
Close() error
}

type DefragNotifier interface {
DefragStarted()
DefragFinished()
}

type txReadBufferCache struct {
mu sync.Mutex
buf *txReadBuffer
Expand Down Expand Up @@ -127,6 +134,9 @@ type backend struct {
txPostLockInsideApplyHook func()

lg *zap.Logger

defragNotifiers []DefragNotifier
notifierMu sync.RWMutex
}

type BackendConfig struct {
Expand Down Expand Up @@ -445,6 +455,27 @@ func (b *backend) Defrag() error {
return b.defrag()
}

func (b *backend) SubscribeDefragNotifier(notifier DefragNotifier) {
if notifier == nil {
return
}
b.notifierMu.Lock()
defer b.notifierMu.Unlock()
b.defragNotifiers = append(b.defragNotifiers, notifier)
}

func (b *backend) defragStarted() {
for _, notifier := range b.defragNotifiers {
notifier.DefragStarted()
}
}

func (b *backend) defragFinished() {
for _, notifier := range b.defragNotifiers {
notifier.DefragFinished()
}
}

func (b *backend) defrag() error {
now := time.Now()
isDefragActive.Set(1)
Expand All @@ -459,6 +490,9 @@ func (b *backend) defrag() error {
// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()
// send notifications after acquiring the lock.
b.defragStarted()
defer b.defragFinished()

// block concurrent read requests while resetting tx
b.readTx.Lock()
Expand Down
1 change: 1 addition & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ func (b *fakeBackend) ForceCommit()
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
func (b *fakeBackend) SubscribeDefragNotifier(backend.DefragNotifier) {}

type indexGetResp struct {
rev Revision
Expand Down

0 comments on commit 8529110

Please sign in to comment.