Skip to content

Commit

Permalink
update the full snapshot lease with renewTime set to the time full sn…
Browse files Browse the repository at this point in the history
…apshot was taken instead of when it's renewed as part of retry method
  • Loading branch information
anveshreddy18 committed Sep 6, 2024
1 parent 2ac94d2 commit ed48157
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)
if opts.EnabledLeaseRenewal {
// Update revisions in holder identity of full snapshot lease.
ctx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName); err != nil {
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName, time.Now()); err != nil {
cp.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand Down
11 changes: 6 additions & 5 deletions pkg/health/heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (hb *Heartbeat) RenewMemberLease(ctx context.Context) error {
}

// UpdateFullSnapshotLease renews the full snapshot lease and updates the holderIdentity field with the last revision in the latest full snapshot.
func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error {
func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string, fullSnapshotTime time.Time) error {
if k8sClientset == nil {
return &errors.EtcdError{
Message: "nil clientset passed",
Expand Down Expand Up @@ -191,7 +191,7 @@ func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnap
}

renewedLease := fullSnapLease.DeepCopy()
renewedTime := time.Now()
renewedTime := fullSnapshotTime
renewedLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime}
// Update revisions in fullSnapLease.Spec.HolderIdentity only when its value is less than latest fullSnap.LastRevision
if fullSnapLease.Spec.HolderIdentity == nil || rev < fullSnapshot.LastRevision {
Expand All @@ -204,7 +204,8 @@ func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnap
return err
}

logger.Info(logString, " at time ", renewedTime)
logger.Info(logString, " at time ", time.Now())
logger.Info("Full snapshot lease's spec.renewTime is updated to the full snapshot taken time i.e ", fullSnapshotTime)
return nil
}); err != nil {
return &errors.EtcdError{
Expand Down Expand Up @@ -278,8 +279,8 @@ func UpdateDeltaSnapshotLease(ctx context.Context, logger *logrus.Entry, prevDel
}

// FullSnapshotCaseLeaseUpdate Updates the fullsnapshot lease as needed when a full snapshot is taken
func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error {
if err := UpdateFullSnapshotLease(ctx, logger, fullSnapshot, k8sClientset, fullSnapshotLeaseName); err != nil {
func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string, fullSnapshotTime time.Time) error {
if err := UpdateFullSnapshotLease(ctx, logger, fullSnapshot, k8sClientset, fullSnapshotLeaseName, fullSnapshotTime); err != nil {
return &errors.EtcdError{
Message: fmt.Sprintf("Failed to update full snapshot lease: %v", err),
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/health/heartbeat/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ var _ = Describe("Heartbeat", func() {
Expect(err).ShouldNot(HaveOccurred())

// Update full snapshot lease with the first full snapshot
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, prevFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, prevFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName, time.Now())
Expect(err).ShouldNot(HaveOccurred())

l := &v1.Lease{}
Expand All @@ -131,7 +131,7 @@ var _ = Describe("Heartbeat", func() {
Expect(l.Spec.HolderIdentity).To(PointTo(Equal("980")))

// Trigger full snapshot lease update with latest full snapshot which is not the first full snapshot
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, latestFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, latestFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName, time.Now())
Expect(err).ShouldNot(HaveOccurred())

l = &v1.Lease{}
Expand All @@ -150,7 +150,7 @@ var _ = Describe("Heartbeat", func() {

Expect(k8sClientset.Create(context.TODO(), lease)).To(Succeed())

err := heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
err := heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName, time.Now())
Expect(err).Should(HaveOccurred())

err = k8sClientset.Delete(context.TODO(), lease)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/backuprestoreserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex
if b.config.HealthConfig.SnapshotLeaseRenewalEnabled {
leaseUpdatectx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration)
defer cancel()
if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName); err != nil {
if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName, time.Now()); err != nil {
b.logger.Warnf("Snapshot lease update failed : %v", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/snapshot/snapshotter/fullsnapshotleaseupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// RenewFullSnapshotLeasePeriodically has a timer and will periodically call FullSnapshotCaseLeaseUpdate to renew the fullsnapshot lease until it is updated or stopped.
// The timer starts upon snapshotter initialization and is reset after every full snapshot is taken.
func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh chan struct{}, fullSnapshotLeaseUpdateInterval time.Duration) {
func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh chan struct{}, fullSnapshotLeaseUpdateInterval time.Duration) {
logger := logrus.NewEntry(logrus.New()).WithField("actor", "FullSnapLeaseUpdater")
ssr.FullSnapshotLeaseUpdateTimer = time.NewTimer(fullSnapshotLeaseUpdateInterval)
fullSnapshotLeaseUpdateCtx, fullSnapshotLeaseUpdateCancel := context.WithCancel(context.TODO())
Expand All @@ -34,7 +34,7 @@ func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStop
if err := func() error {
ctx, cancel := context.WithTimeout(fullSnapshotLeaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
defer cancel()
return heartbeat.FullSnapshotCaseLeaseUpdate(ctx, logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.HealthConfig.FullSnapshotLeaseName)
return heartbeat.FullSnapshotCaseLeaseUpdate(ctx, logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.HealthConfig.FullSnapshotLeaseName, ssr.PrevFullSnapshot.CreatedOn)
}(); err != nil {
//FullSnapshot lease update failed. Retry after interval
logger.Warnf("FullSnapshot lease update failed with error: %v", err)
Expand All @@ -52,7 +52,7 @@ func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStop
ssr.FullSnapshotLeaseUpdateTimer.Reset(fullSnapshotLeaseUpdateInterval)
}

case <-FullSnapshotLeaseStopCh:
case <-fullSnapshotLeaseStopCh:
logger.Info("Closing the full snapshot lease renewal")
return
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/snapshot/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func NewSnapshotter(logger *logrus.Entry, config *brtypes.SnapshotterConfig, sto
// Setting startWithFullSnapshot to false will start the snapshotter without
// taking the first full snapshot.
func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool) error {
FullSnapshotLeaseStopCh := make(chan struct{})
defer ssr.stop(FullSnapshotLeaseStopCh)
fullSnapshotLeaseStopCh := make(chan struct{})
defer ssr.stop(fullSnapshotLeaseStopCh)
if startWithFullSnapshot {
ssr.fullSnapshotTimer = time.NewTimer(0)
} else {
Expand All @@ -188,7 +188,7 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool)
}
}
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, brtypes.FullSnapshotLeaseUpdateInterval)
go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, brtypes.FullSnapshotLeaseUpdateInterval)
}
ssr.deltaSnapshotTimer = time.NewTimer(brtypes.DefaultDeltaSnapshotInterval)
if ssr.config.DeltaSnapshotPeriod.Duration >= brtypes.DeltaSnapshotIntervalThreshold {
Expand Down Expand Up @@ -234,7 +234,7 @@ func (ssr *Snapshotter) TriggerDeltaSnapshot() (*brtypes.Snapshot, error) {

// stop stops the snapshotter. Once stopped any subsequent calls will
// not have any effect.
func (ssr *Snapshotter) stop(FullSnapshotLeaseStopCh chan struct{}) {
func (ssr *Snapshotter) stop(fullSnapshotLeaseStopCh chan struct{}) {
ssr.logger.Info("Closing the Snapshotter...")

if ssr.fullSnapshotTimer != nil {
Expand All @@ -246,7 +246,7 @@ func (ssr *Snapshotter) stop(FullSnapshotLeaseStopCh chan struct{}) {
ssr.deltaSnapshotTimer = nil
}
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
FullSnapshotLeaseStopCh <- emptyStruct
fullSnapshotLeaseStopCh <- emptyStruct
}
ssr.SetSnapshotterInactive()
ssr.closeEtcdClient()
Expand Down
16 changes: 8 additions & 8 deletions pkg/snapshot/snapshotter/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ var _ = Describe("Snapshotter", func() {
var (
ssr *Snapshotter
lease *v1.Lease
FullSnapshotLeaseStopCh chan struct{}
fullSnapshotLeaseStopCh chan struct{}
ctx context.Context
cancel context.CancelFunc
fullSnapshotLeaseUpdateInterval time.Duration
Expand Down Expand Up @@ -932,7 +932,7 @@ var _ = Describe("Snapshotter", func() {
ssr.PrevFullSnapshot = nil
ssr.K8sClientset = fake.NewClientBuilder().Build()
ssr.HealthConfig.SnapshotLeaseRenewalEnabled = true
FullSnapshotLeaseStopCh = make(chan struct{})
fullSnapshotLeaseStopCh = make(chan struct{})
})
AfterEach(func() {
Expect(os.Unsetenv("POD_NAME")).To(Succeed())
Expand All @@ -947,9 +947,9 @@ var _ = Describe("Snapshotter", func() {
Expect(err).ShouldNot(HaveOccurred())

fullSnapshotLeaseUpdateInterval = 2 * time.Second
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
time.Sleep(2 * time.Second)
close(FullSnapshotLeaseStopCh)
close(fullSnapshotLeaseStopCh)

l := &v1.Lease{}
Expect(ssr.K8sClientset.Get(ctx, client.ObjectKey{
Expand All @@ -976,9 +976,9 @@ var _ = Describe("Snapshotter", func() {
Expect(err).ShouldNot(HaveOccurred())

fullSnapshotLeaseUpdateInterval = time.Second
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
time.Sleep(2 * time.Second)
close(FullSnapshotLeaseStopCh)
close(fullSnapshotLeaseStopCh)

l := &v1.Lease{}
Expect(ssr.K8sClientset.Get(ctx, client.ObjectKey{
Expand All @@ -1000,7 +1000,7 @@ var _ = Describe("Snapshotter", func() {
prevFullSnap.GenerateSnapshotName()
ssr.PrevFullSnapshot = prevFullSnap
fullSnapshotLeaseUpdateInterval = 3 * time.Second
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
time.Sleep(time.Second)
err := ssr.K8sClientset.Create(ctx, lease)
Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -1018,7 +1018,7 @@ var _ = Describe("Snapshotter", func() {
Name: lease.Name,
}, l)).To(Succeed())
Expect(*l.Spec.HolderIdentity).To(Equal(strconv.FormatInt(prevFullSnap.LastRevision, 10)))
close(FullSnapshotLeaseStopCh)
close(fullSnapshotLeaseStopCh)
})
})
})
Expand Down

0 comments on commit ed48157

Please sign in to comment.