Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rel-v0.28] Full snapshot lease update retry on failure #747

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)
if opts.EnabledLeaseRenewal {
// Update revisions in holder identity of full snapshot lease.
ctx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName, opts.DeltaSnapshotLeaseName); err != nil {
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName); err != nil {
cp.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand Down
12 changes: 5 additions & 7 deletions pkg/health/heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (hb *Heartbeat) RenewMemberLease(ctx context.Context) error {
return nil
}

// UpdateFullSnapshotLease renews the full snapshot lease and updates the holderIdentity field with the last revision in the latest full snapshot
// UpdateFullSnapshotLease renews the full snapshot lease and updates the holderIdentity field with the last revision in the latest full snapshot.
func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error {
if k8sClientset == nil {
return &errors.EtcdError{
Expand Down Expand Up @@ -221,7 +221,6 @@ func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnap
Message: fmt.Sprintf("Failed to update full snapshot lease: %v", err),
}
}

return nil
}

Expand Down Expand Up @@ -288,18 +287,17 @@ func UpdateDeltaSnapshotLease(ctx context.Context, logger *logrus.Entry, prevDel
return nil
}

// FullSnapshotCaseLeaseUpdate Updates the fullsnapshot lease and the deltasnapshot lease as needed when a full snapshot is taken
func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string, deltaSnapshotLeaseName string) error {
// FullSnapshotCaseLeaseUpdate Updates the fullsnapshot lease as needed when a full snapshot is taken
func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error {
if err := UpdateFullSnapshotLease(ctx, logger, fullSnapshot, k8sClientset, fullSnapshotLeaseName); err != nil {
return &errors.EtcdError{
Message: fmt.Sprintf("Failed to update full snapshot lease: %v", err),
}
}

return nil
}

// DeltaSnapshotCaseLeaseUpdate Updates the fullsnapshot lease and the deltasnapshot lease as needed when a delta snapshot is taken
// DeltaSnapshotCaseLeaseUpdate Updates the deltasnapshot lease as needed when a delta snapshot is taken
func DeltaSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, k8sClientset client.Client, deltaSnapshotLeaseName string, store brtypes.SnapStore) error {
_, latestDeltaSnapshotList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
if err == nil {
Expand Down Expand Up @@ -353,7 +351,7 @@ func RenewMemberLeasePeriodically(ctx context.Context, stopCh chan struct{}, hco
hb.logger.Info("Stopped member lease renewal timer")
return nil
case <-stopCh:
hb.logger.Info("Stoping the member lease renewal")
hb.logger.Info("Stopping the member lease renewal")
return nil
}
}
Expand Down
28 changes: 23 additions & 5 deletions pkg/health/heartbeat/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,46 @@ var _ = Describe("Heartbeat", func() {
Expect(os.Getenv("POD_NAME")).Should(Equal("test_pod"))
Expect(os.Getenv("POD_NAMESPACE")).Should(Equal("test_namespace"))

snap := &brtypes.Snapshot{
prevFullSnap := &brtypes.Snapshot{
Kind: brtypes.SnapshotKindFull,
CreatedOn: time.Now(),
StartRevision: 0,
LastRevision: 980,
}
prevFullSnap.GenerateSnapshotName()

latestFullSnap := &brtypes.Snapshot{
Kind: brtypes.SnapshotKindFull,
CreatedOn: time.Now(),
StartRevision: 0,
LastRevision: 989,
}
snap.GenerateSnapshotName()
latestFullSnap.GenerateSnapshotName()
err := k8sClientset.Create(context.TODO(), lease)
Expect(err).ShouldNot(HaveOccurred())

err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, snap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
// Update full snapshot lease with the first full snapshot
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, prevFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
Expect(err).ShouldNot(HaveOccurred())

l := &v1.Lease{}
Expect(k8sClientset.Get(context.TODO(), client.ObjectKey{
Namespace: lease.Namespace,
Name: lease.Name,
}, l)).To(Succeed())
Expect(l.Spec.HolderIdentity).To(PointTo(Equal("980")))

Expect(l.Spec.HolderIdentity).To(PointTo(Equal("989")))
// Trigger full snapshot lease update with latest full snapshot which is not the first full snapshot
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, latestFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
Expect(err).ShouldNot(HaveOccurred())

l = &v1.Lease{}
Expect(k8sClientset.Get(context.TODO(), client.ObjectKey{
Namespace: lease.Namespace,
Name: lease.Name,
}, l)).To(Succeed())
Expect(l.Spec.HolderIdentity).To(PointTo(Equal("989")))

err = k8sClientset.Delete(context.TODO(), l)
Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -132,7 +150,7 @@ var _ = Describe("Heartbeat", func() {

Expect(k8sClientset.Create(context.TODO(), lease)).To(Succeed())

err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
err := heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
Expect(err).Should(HaveOccurred())

err = k8sClientset.Delete(context.TODO(), lease)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/backuprestoreserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex
if b.config.HealthConfig.SnapshotLeaseRenewalEnabled {
leaseUpdatectx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration)
defer cancel()
if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName, b.config.HealthConfig.DeltaSnapshotLeaseName); err != nil {
if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName); err != nil {
b.logger.Warnf("Snapshot lease update failed : %v", err)
}
}
Expand Down
60 changes: 60 additions & 0 deletions pkg/snapshot/snapshotter/fullsnapshotleaseupdate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0

package snapshotter

import (
"context"
"time"

"github.com/gardener/etcd-backup-restore/pkg/health/heartbeat"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/sirupsen/logrus"
)

// RenewFullSnapshotLeasePeriodically has a timer and will periodically call FullSnapshotCaseLeaseUpdate to renew the fullsnapshot lease until it is updated or stopped.
// The timer starts upon snapshotter initialization and is reset after every full snapshot is taken.
func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh chan struct{}, fullSnapshotLeaseUpdateInterval time.Duration) {
logger := logrus.NewEntry(logrus.New()).WithField("actor", "FullSnapLeaseUpdater")
ssr.FullSnapshotLeaseUpdateTimer = time.NewTimer(fullSnapshotLeaseUpdateInterval)
fullSnapshotLeaseUpdateCtx, fullSnapshotLeaseUpdateCancel := context.WithCancel(context.TODO())
defer func() {
fullSnapshotLeaseUpdateCancel()
if ssr.FullSnapshotLeaseUpdateTimer != nil {
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer = nil
}
}()
logger.Infof("Starting the FullSnapshot lease renewal with interval %v", fullSnapshotLeaseUpdateInterval)
for {
select {
case <-ssr.FullSnapshotLeaseUpdateTimer.C:
if ssr.PrevFullSnapshot != nil {
if err := func() error {
ctx, cancel := context.WithTimeout(fullSnapshotLeaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
defer cancel()
return heartbeat.FullSnapshotCaseLeaseUpdate(ctx, logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.HealthConfig.FullSnapshotLeaseName)
}(); err != nil {
//FullSnapshot lease update failed. Retry after interval
logger.Warnf("FullSnapshot lease update failed with error: %v", err)
logger.Infof("Resetting the FullSnapshot lease to retry updating with revision %d after %v", ssr.PrevFullSnapshot.LastRevision, fullSnapshotLeaseUpdateInterval)
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(fullSnapshotLeaseUpdateInterval)
} else {
//FullSnapshot lease successfully updated. Stop the timer
logger.Infof("Stopping the FullSnapshot lease update")
ssr.FullSnapshotLeaseUpdateTimer.Stop()
}
} else {
//Skip the FullSnapshot lease update as no full snapshot has been taken yet. Reset the timer to retry after interval
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(fullSnapshotLeaseUpdateInterval)
}

case <-FullSnapshotLeaseStopCh:
logger.Info("Closing the full snapshot lease renewal")
return
}
}
}
124 changes: 62 additions & 62 deletions pkg/snapshot/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,32 +82,33 @@ func NewSnapshotterConfig() *brtypes.SnapshotterConfig {

// Snapshotter is a struct for etcd snapshot taker
type Snapshotter struct {
logger *logrus.Entry
etcdConnectionConfig *brtypes.EtcdConnectionConfig
store brtypes.SnapStore
config *brtypes.SnapshotterConfig
compressionConfig *compressor.CompressionConfig
healthConfig *brtypes.HealthConfig
schedule cron.Schedule
PrevSnapshot *brtypes.Snapshot
PrevFullSnapshot *brtypes.Snapshot
PrevDeltaSnapshots brtypes.SnapList
fullSnapshotReqCh chan bool
deltaSnapshotReqCh chan struct{}
fullSnapshotAckCh chan result
deltaSnapshotAckCh chan result
fullSnapshotTimer *time.Timer
deltaSnapshotTimer *time.Timer
events []byte
watchCh clientv3.WatchChan
etcdWatchClient *clientv3.Watcher
cancelWatch context.CancelFunc
SsrStateMutex *sync.Mutex
SsrState brtypes.SnapshotterState
lastEventRevision int64
K8sClientset client.Client
snapstoreConfig *brtypes.SnapstoreConfig
lastSecretModifiedTime time.Time
logger *logrus.Entry
etcdConnectionConfig *brtypes.EtcdConnectionConfig
store brtypes.SnapStore
config *brtypes.SnapshotterConfig
compressionConfig *compressor.CompressionConfig
HealthConfig *brtypes.HealthConfig
schedule cron.Schedule
PrevSnapshot *brtypes.Snapshot
PrevFullSnapshot *brtypes.Snapshot
PrevDeltaSnapshots brtypes.SnapList
fullSnapshotReqCh chan bool
deltaSnapshotReqCh chan struct{}
fullSnapshotAckCh chan result
deltaSnapshotAckCh chan result
FullSnapshotLeaseUpdateTimer *time.Timer
fullSnapshotTimer *time.Timer
deltaSnapshotTimer *time.Timer
events []byte
watchCh clientv3.WatchChan
etcdWatchClient *clientv3.Watcher
cancelWatch context.CancelFunc
SsrStateMutex *sync.Mutex
SsrState brtypes.SnapshotterState
lastEventRevision int64
K8sClientset client.Client
snapstoreConfig *brtypes.SnapstoreConfig
lastSecretModifiedTime time.Time
}

// NewSnapshotter returns the snapshotter object.
Expand Down Expand Up @@ -153,29 +154,29 @@ func NewSnapshotter(logger *logrus.Entry, config *brtypes.SnapshotterConfig, sto
config: config,
etcdConnectionConfig: etcdConnectionConfig,
compressionConfig: compressionConfig,
healthConfig: healthConfig,

schedule: sdl,
PrevSnapshot: prevSnapshot,
PrevFullSnapshot: fullSnap,
PrevDeltaSnapshots: deltaSnapList,
SsrState: brtypes.SnapshotterInactive,
SsrStateMutex: &sync.Mutex{},
fullSnapshotReqCh: make(chan bool),
deltaSnapshotReqCh: make(chan struct{}),
fullSnapshotAckCh: make(chan result),
deltaSnapshotAckCh: make(chan result),
cancelWatch: func() {},
K8sClientset: clientSet,
snapstoreConfig: storeConfig,
HealthConfig: healthConfig,
schedule: sdl,
PrevSnapshot: prevSnapshot,
PrevFullSnapshot: fullSnap,
PrevDeltaSnapshots: deltaSnapList,
SsrState: brtypes.SnapshotterInactive,
SsrStateMutex: &sync.Mutex{},
fullSnapshotReqCh: make(chan bool),
deltaSnapshotReqCh: make(chan struct{}),
fullSnapshotAckCh: make(chan result),
deltaSnapshotAckCh: make(chan result),
cancelWatch: func() {},
K8sClientset: clientSet,
snapstoreConfig: storeConfig,
}, nil
}

// Run process loop for scheduled backup
// Setting startWithFullSnapshot to false will start the snapshotter without
// taking the first full snapshot.
func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool) error {
defer ssr.stop()
FullSnapshotLeaseStopCh := make(chan struct{})
defer ssr.stop(FullSnapshotLeaseStopCh)
if startWithFullSnapshot {
ssr.fullSnapshotTimer = time.NewTimer(0)
} else {
Expand All @@ -196,7 +197,9 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool)
return fmt.Errorf("failed to reset full snapshot timer: %v", err)
}
}

if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, brtypes.FullSnapshotLeaseUpdateInterval)
}
ssr.deltaSnapshotTimer = time.NewTimer(brtypes.DefaultDeltaSnapshotInterval)
if ssr.config.DeltaSnapshotPeriod.Duration >= brtypes.DeltaSnapshotIntervalThreshold {
ssr.deltaSnapshotTimer.Stop()
Expand Down Expand Up @@ -241,7 +244,7 @@ func (ssr *Snapshotter) TriggerDeltaSnapshot() (*brtypes.Snapshot, error) {

// stop stops the snapshotter. Once stopped any subsequent calls will
// not have any effect.
func (ssr *Snapshotter) stop() {
func (ssr *Snapshotter) stop(FullSnapshotLeaseStopCh chan struct{}) {
ssr.logger.Info("Closing the Snapshotter...")

if ssr.fullSnapshotTimer != nil {
Expand All @@ -252,6 +255,9 @@ func (ssr *Snapshotter) stop() {
ssr.deltaSnapshotTimer.Stop()
ssr.deltaSnapshotTimer = nil
}
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
FullSnapshotLeaseStopCh <- emptyStruct
}
ssr.SetSnapshotterInactive()
ssr.closeEtcdClient()
}
Expand Down Expand Up @@ -644,12 +650,9 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err = heartbeat.FullSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.healthConfig.FullSnapshotLeaseName, ssr.healthConfig.DeltaSnapshotLeaseName); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(time.Nanosecond)
}

case <-ssr.deltaSnapshotReqCh:
Expand All @@ -662,9 +665,9 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err = heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.healthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
if err = heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.HealthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand All @@ -674,22 +677,19 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if _, err := ssr.TakeFullSnapshotAndResetTimer(false); err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.healthConfig.FullSnapshotLeaseName, ssr.healthConfig.DeltaSnapshotLeaseName); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ssr.FullSnapshotLeaseUpdateTimer.Stop()
ssr.FullSnapshotLeaseUpdateTimer.Reset(time.Nanosecond)
}

case <-ssr.deltaSnapshotTimer.C:
if ssr.config.DeltaSnapshotPeriod.Duration >= time.Second {
if _, err := ssr.takeDeltaSnapshotAndResetTimer(); err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.healthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.HealthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand All @@ -704,11 +704,11 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error {
if err := ssr.handleDeltaWatchEvents(wr); err != nil {
return err
}
if ssr.healthConfig.SnapshotLeaseRenewalEnabled {
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
//Call UpdateDeltaSnapshotLease only if new delta snapshot taken
if snapshots < len(ssr.PrevDeltaSnapshots) {
ctx, cancel := context.WithTimeout(leaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.healthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
if err := heartbeat.DeltaSnapshotCaseLeaseUpdate(ctx, ssr.logger, ssr.K8sClientset, ssr.HealthConfig.DeltaSnapshotLeaseName, ssr.store); err != nil {
ssr.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand Down
Loading