Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: move rpc defrag notifier into backend. #16959

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage/backend"
)

const (
allGRPCServices = ""
)

type notifier interface {
defragStarted()
defragFinished()
}

func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier {
func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) backend.DefragNotifier {
if hs == nil {
panic("unexpected nil gRPC health server")
}
Expand All @@ -49,14 +45,14 @@ type healthNotifier struct {
stopGRPCServiceOnDefrag bool
}

func (hc *healthNotifier) defragStarted() {
func (hc *healthNotifier) DefragStarted() {
if !hc.stopGRPCServiceOnDefrag {
return
}
hc.stopServe("defrag is active")
}

func (hc *healthNotifier) defragFinished() { hc.startServe() }
func (hc *healthNotifier) DefragFinished() { hc.startServe() }

func (hc *healthNotifier) startServe() {
hc.lg.Info(
Expand Down
11 changes: 5 additions & 6 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,21 @@ type maintenanceServer struct {
cs ClusterStatusGetter
d Downgrader
vs serverversion.Server

healthNotifier notifier
}

func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier notifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), healthNotifier: healthNotifier}
func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier backend.DefragNotifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
if healthNotifier != nil {
s.Backend().SubscribeDefragNotifier(healthNotifier)
}
return &authMaintenanceServer{srv, &AuthAdmin{s}}
}

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
ms.healthNotifier.defragStarted()
defer ms.healthNotifier.defragFinished()
err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand Down
31 changes: 31 additions & 0 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type Backend interface {

// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
SetTxPostLockInsideApplyHook(func())

SubscribeDefragNotifier(notifier DefragNotifier)
}

type Snapshot interface {
Expand All @@ -82,6 +84,11 @@ type Snapshot interface {
Close() error
}

type DefragNotifier interface {
DefragStarted()
DefragFinished()
}

type txReadBufferCache struct {
mu sync.Mutex
buf *txReadBuffer
Expand Down Expand Up @@ -127,6 +134,9 @@ type backend struct {
txPostLockInsideApplyHook func()

lg *zap.Logger

defragNotifiers []DefragNotifier
notifierMu sync.RWMutex
}

type BackendConfig struct {
Expand Down Expand Up @@ -445,6 +455,24 @@ func (b *backend) Defrag() error {
return b.defrag()
}

func (b *backend) SubscribeDefragNotifier(notifier DefragNotifier) {
b.notifierMu.Lock()
defer b.notifierMu.Unlock()
b.defragNotifiers = append(b.defragNotifiers, notifier)
}

func (b *backend) defragStarted() {
for _, notifier := range b.defragNotifiers {
notifier.DefragStarted()
}
}

func (b *backend) defragFinished() {
for _, notifier := range b.defragNotifiers {
notifier.DefragFinished()
}
}

func (b *backend) defrag() error {
now := time.Now()
isDefragActive.Set(1)
Expand All @@ -459,6 +487,9 @@ func (b *backend) defrag() error {
// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()
// send notifications after acquiring the lock.
b.defragStarted()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notifying listeners under lock is a simple and correct way to fix the race issue of two goroutines calling Defrag at once. However the current implementation comes with one flaw, it introduces an external call under a lock.

Backend just calls and blocks on executing the notifiers, but doesn't really know what they are doing. We should be really careful about introducing a blocking call here. Do you know if call .SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_SERVING) is blocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SetServingStatus call is not blocking. It sends update to a channel, but it clears the channel first. And the lock is used only in simple cases.

// SetServingStatus is called when need to reset the serving status of a service
// or insert a new service entry into the statusMap.
func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
	s.mu.Lock()
	defer s.mu.Unlock()
	if s.shutdown {
		logger.Infof("health: status changing for %s to %v is ignored because health service is shutdown", service, servingStatus)
		return
	}

	s.setServingStatusLocked(service, servingStatus)
}

func (s *Server) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
	s.statusMap[service] = servingStatus
	for _, update := range s.updates[service] {
		// Clears previous updates, that are not sent to the client, from the channel.
		// This can happen if the client is not reading and the server gets flow control limited.
		select {
		case <-update:
		default:
		}
		// Puts the most recent update to the channel.
		update <- servingStatus
	}
}

Do you think we should make the defragStarted call in a separate routine to remove such concerns?

defer b.defragFinished()

// block concurrent read requests while resetting tx
b.readTx.Lock()
Expand Down
1 change: 1 addition & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ func (b *fakeBackend) ForceCommit()
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
func (b *fakeBackend) SubscribeDefragNotifier(backend.DefragNotifier) {}

type indexGetResp struct {
rev Revision
Expand Down
Loading