From ed48157004c5a8e5b9c66f3028543bb2ed084afd Mon Sep 17 00:00:00 2001 From: Anvesh Reddy Pinnapureddy Date: Fri, 6 Sep 2024 16:19:54 +0530 Subject: [PATCH] update the full snapshot lease with renewTime set to the time full snapshot was taken instead of when it's renewed as part of retry method --- pkg/compactor/compactor.go | 2 +- pkg/health/heartbeat/heartbeat.go | 11 ++++++----- pkg/health/heartbeat/heartbeat_test.go | 6 +++--- pkg/server/backuprestoreserver.go | 2 +- .../snapshotter/fullsnapshotleaseupdate.go | 6 +++--- pkg/snapshot/snapshotter/snapshotter.go | 10 +++++----- pkg/snapshot/snapshotter/snapshotter_test.go | 16 ++++++++-------- 7 files changed, 27 insertions(+), 26 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 4f211606f..3f6d750b7 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -182,7 +182,7 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions) if opts.EnabledLeaseRenewal { // Update revisions in holder identity of full snapshot lease. ctx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration) - if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName); err != nil { + if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName, time.Now()); err != nil { cp.logger.Warnf("Snapshot lease update failed : %v", err) } cancel() diff --git a/pkg/health/heartbeat/heartbeat.go b/pkg/health/heartbeat/heartbeat.go index 4ff5b4400..2dfb3cf40 100644 --- a/pkg/health/heartbeat/heartbeat.go +++ b/pkg/health/heartbeat/heartbeat.go @@ -144,7 +144,7 @@ func (hb *Heartbeat) RenewMemberLease(ctx context.Context) error { } // UpdateFullSnapshotLease renews the full snapshot lease and updates the holderIdentity field with the last revision in the latest full snapshot. -func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error { +func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string, fullSnapshotTime time.Time) error { if k8sClientset == nil { return &errors.EtcdError{ Message: "nil clientset passed", @@ -191,7 +191,7 @@ func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnap } renewedLease := fullSnapLease.DeepCopy() - renewedTime := time.Now() + renewedTime := fullSnapshotTime renewedLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime} // Update revisions in fullSnapLease.Spec.HolderIdentity only when its value is less than latest fullSnap.LastRevision if fullSnapLease.Spec.HolderIdentity == nil || rev < fullSnapshot.LastRevision { @@ -204,7 +204,8 @@ func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnap return err } - logger.Info(logString, " at time ", renewedTime) + logger.Info(logString, " at time ", time.Now()) + logger.Info("Full snapshot lease's spec.renewTime is updated to the full snapshot taken time i.e ", fullSnapshotTime) return nil }); err != nil { return &errors.EtcdError{ @@ -278,8 +279,8 @@ func UpdateDeltaSnapshotLease(ctx context.Context, logger *logrus.Entry, prevDel } // FullSnapshotCaseLeaseUpdate Updates the fullsnapshot lease as needed when a full snapshot is taken -func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error { - if err := UpdateFullSnapshotLease(ctx, logger, fullSnapshot, k8sClientset, fullSnapshotLeaseName); err != nil { +func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string, fullSnapshotTime time.Time) error { + if err := UpdateFullSnapshotLease(ctx, logger, fullSnapshot, k8sClientset, fullSnapshotLeaseName, fullSnapshotTime); err != nil { return &errors.EtcdError{ Message: fmt.Sprintf("Failed to update full snapshot lease: %v", err), } diff --git a/pkg/health/heartbeat/heartbeat_test.go b/pkg/health/heartbeat/heartbeat_test.go index 9c1c508be..8cfaf527d 100644 --- a/pkg/health/heartbeat/heartbeat_test.go +++ b/pkg/health/heartbeat/heartbeat_test.go @@ -120,7 +120,7 @@ var _ = Describe("Heartbeat", func() { Expect(err).ShouldNot(HaveOccurred()) // Update full snapshot lease with the first full snapshot - err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, prevFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName) + err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, prevFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName, time.Now()) Expect(err).ShouldNot(HaveOccurred()) l := &v1.Lease{} @@ -131,7 +131,7 @@ var _ = Describe("Heartbeat", func() { Expect(l.Spec.HolderIdentity).To(PointTo(Equal("980"))) // Trigger full snapshot lease update with latest full snapshot which is not the first full snapshot - err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, latestFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName) + err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, latestFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName, time.Now()) Expect(err).ShouldNot(HaveOccurred()) l = &v1.Lease{} @@ -150,7 +150,7 @@ var _ = Describe("Heartbeat", func() { Expect(k8sClientset.Create(context.TODO(), lease)).To(Succeed()) - err := heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName) + err := heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName, time.Now()) Expect(err).Should(HaveOccurred()) err = k8sClientset.Delete(context.TODO(), lease) diff --git a/pkg/server/backuprestoreserver.go b/pkg/server/backuprestoreserver.go index 0137ffb90..8e69565a0 100644 --- a/pkg/server/backuprestoreserver.go +++ b/pkg/server/backuprestoreserver.go @@ -423,7 +423,7 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex if b.config.HealthConfig.SnapshotLeaseRenewalEnabled { leaseUpdatectx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration) defer cancel() - if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName); err != nil { + if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName, time.Now()); err != nil { b.logger.Warnf("Snapshot lease update failed : %v", err) } } diff --git a/pkg/snapshot/snapshotter/fullsnapshotleaseupdate.go b/pkg/snapshot/snapshotter/fullsnapshotleaseupdate.go index 0a5322118..368acae64 100644 --- a/pkg/snapshot/snapshotter/fullsnapshotleaseupdate.go +++ b/pkg/snapshot/snapshotter/fullsnapshotleaseupdate.go @@ -15,7 +15,7 @@ import ( // RenewFullSnapshotLeasePeriodically has a timer and will periodically call FullSnapshotCaseLeaseUpdate to renew the fullsnapshot lease until it is updated or stopped. // The timer starts upon snapshotter initialization and is reset after every full snapshot is taken. -func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh chan struct{}, fullSnapshotLeaseUpdateInterval time.Duration) { +func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh chan struct{}, fullSnapshotLeaseUpdateInterval time.Duration) { logger := logrus.NewEntry(logrus.New()).WithField("actor", "FullSnapLeaseUpdater") ssr.FullSnapshotLeaseUpdateTimer = time.NewTimer(fullSnapshotLeaseUpdateInterval) fullSnapshotLeaseUpdateCtx, fullSnapshotLeaseUpdateCancel := context.WithCancel(context.TODO()) @@ -34,7 +34,7 @@ func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStop if err := func() error { ctx, cancel := context.WithTimeout(fullSnapshotLeaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration) defer cancel() - return heartbeat.FullSnapshotCaseLeaseUpdate(ctx, logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.HealthConfig.FullSnapshotLeaseName) + return heartbeat.FullSnapshotCaseLeaseUpdate(ctx, logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.HealthConfig.FullSnapshotLeaseName, ssr.PrevFullSnapshot.CreatedOn) }(); err != nil { //FullSnapshot lease update failed. Retry after interval logger.Warnf("FullSnapshot lease update failed with error: %v", err) @@ -52,7 +52,7 @@ func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStop ssr.FullSnapshotLeaseUpdateTimer.Reset(fullSnapshotLeaseUpdateInterval) } - case <-FullSnapshotLeaseStopCh: + case <-fullSnapshotLeaseStopCh: logger.Info("Closing the full snapshot lease renewal") return } diff --git a/pkg/snapshot/snapshotter/snapshotter.go b/pkg/snapshot/snapshotter/snapshotter.go index 1dac10c32..c3b61e08c 100644 --- a/pkg/snapshot/snapshotter/snapshotter.go +++ b/pkg/snapshot/snapshotter/snapshotter.go @@ -165,8 +165,8 @@ func NewSnapshotter(logger *logrus.Entry, config *brtypes.SnapshotterConfig, sto // Setting startWithFullSnapshot to false will start the snapshotter without // taking the first full snapshot. func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool) error { - FullSnapshotLeaseStopCh := make(chan struct{}) - defer ssr.stop(FullSnapshotLeaseStopCh) + fullSnapshotLeaseStopCh := make(chan struct{}) + defer ssr.stop(fullSnapshotLeaseStopCh) if startWithFullSnapshot { ssr.fullSnapshotTimer = time.NewTimer(0) } else { @@ -188,7 +188,7 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool) } } if ssr.HealthConfig.SnapshotLeaseRenewalEnabled { - go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, brtypes.FullSnapshotLeaseUpdateInterval) + go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, brtypes.FullSnapshotLeaseUpdateInterval) } ssr.deltaSnapshotTimer = time.NewTimer(brtypes.DefaultDeltaSnapshotInterval) if ssr.config.DeltaSnapshotPeriod.Duration >= brtypes.DeltaSnapshotIntervalThreshold { @@ -234,7 +234,7 @@ func (ssr *Snapshotter) TriggerDeltaSnapshot() (*brtypes.Snapshot, error) { // stop stops the snapshotter. Once stopped any subsequent calls will // not have any effect. -func (ssr *Snapshotter) stop(FullSnapshotLeaseStopCh chan struct{}) { +func (ssr *Snapshotter) stop(fullSnapshotLeaseStopCh chan struct{}) { ssr.logger.Info("Closing the Snapshotter...") if ssr.fullSnapshotTimer != nil { @@ -246,7 +246,7 @@ func (ssr *Snapshotter) stop(FullSnapshotLeaseStopCh chan struct{}) { ssr.deltaSnapshotTimer = nil } if ssr.HealthConfig.SnapshotLeaseRenewalEnabled { - FullSnapshotLeaseStopCh <- emptyStruct + fullSnapshotLeaseStopCh <- emptyStruct } ssr.SetSnapshotterInactive() ssr.closeEtcdClient() diff --git a/pkg/snapshot/snapshotter/snapshotter_test.go b/pkg/snapshot/snapshotter/snapshotter_test.go index 85e2092b2..c9795c2b7 100644 --- a/pkg/snapshot/snapshotter/snapshotter_test.go +++ b/pkg/snapshot/snapshotter/snapshotter_test.go @@ -902,7 +902,7 @@ var _ = Describe("Snapshotter", func() { var ( ssr *Snapshotter lease *v1.Lease - FullSnapshotLeaseStopCh chan struct{} + fullSnapshotLeaseStopCh chan struct{} ctx context.Context cancel context.CancelFunc fullSnapshotLeaseUpdateInterval time.Duration @@ -932,7 +932,7 @@ var _ = Describe("Snapshotter", func() { ssr.PrevFullSnapshot = nil ssr.K8sClientset = fake.NewClientBuilder().Build() ssr.HealthConfig.SnapshotLeaseRenewalEnabled = true - FullSnapshotLeaseStopCh = make(chan struct{}) + fullSnapshotLeaseStopCh = make(chan struct{}) }) AfterEach(func() { Expect(os.Unsetenv("POD_NAME")).To(Succeed()) @@ -947,9 +947,9 @@ var _ = Describe("Snapshotter", func() { Expect(err).ShouldNot(HaveOccurred()) fullSnapshotLeaseUpdateInterval = 2 * time.Second - go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval) + go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval) time.Sleep(2 * time.Second) - close(FullSnapshotLeaseStopCh) + close(fullSnapshotLeaseStopCh) l := &v1.Lease{} Expect(ssr.K8sClientset.Get(ctx, client.ObjectKey{ @@ -976,9 +976,9 @@ var _ = Describe("Snapshotter", func() { Expect(err).ShouldNot(HaveOccurred()) fullSnapshotLeaseUpdateInterval = time.Second - go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval) + go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval) time.Sleep(2 * time.Second) - close(FullSnapshotLeaseStopCh) + close(fullSnapshotLeaseStopCh) l := &v1.Lease{} Expect(ssr.K8sClientset.Get(ctx, client.ObjectKey{ @@ -1000,7 +1000,7 @@ var _ = Describe("Snapshotter", func() { prevFullSnap.GenerateSnapshotName() ssr.PrevFullSnapshot = prevFullSnap fullSnapshotLeaseUpdateInterval = 3 * time.Second - go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval) + go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval) time.Sleep(time.Second) err := ssr.K8sClientset.Create(ctx, lease) Expect(err).ShouldNot(HaveOccurred()) @@ -1018,7 +1018,7 @@ var _ = Describe("Snapshotter", func() { Name: lease.Name, }, l)).To(Succeed()) Expect(*l.Spec.HolderIdentity).To(Equal(strconv.FormatInt(prevFullSnap.LastRevision, 10))) - close(FullSnapshotLeaseStopCh) + close(fullSnapshotLeaseStopCh) }) }) })