Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the full snapshot lease with renewTime set to the time FullSnapshot is taken #753

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)
if opts.EnabledLeaseRenewal {
// Update revisions in holder identity of full snapshot lease.
ctx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration)
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName); err != nil {
if err := heartbeat.FullSnapshotCaseLeaseUpdate(ctx, cp.logger, snapshot, cp.k8sClientset, opts.FullSnapshotLeaseName, snapshot.CreatedOn); err != nil {
cp.logger.Warnf("Snapshot lease update failed : %v", err)
}
cancel()
Expand Down
11 changes: 6 additions & 5 deletions pkg/health/heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (hb *Heartbeat) RenewMemberLease(ctx context.Context) error {
}

// UpdateFullSnapshotLease renews the full snapshot lease and updates the holderIdentity field with the last revision in the latest full snapshot.
func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error {
func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string, fullSnapshotTime time.Time) error {
anveshreddy18 marked this conversation as resolved.
Show resolved Hide resolved
if k8sClientset == nil {
return &errors.EtcdError{
Message: "nil clientset passed",
Expand Down Expand Up @@ -191,7 +191,7 @@ func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnap
}

renewedLease := fullSnapLease.DeepCopy()
renewedTime := time.Now()
renewedTime := fullSnapshotTime
renewedLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime}
// Update revisions in fullSnapLease.Spec.HolderIdentity only when its value is less than latest fullSnap.LastRevision
if fullSnapLease.Spec.HolderIdentity == nil || rev < fullSnapshot.LastRevision {
Expand All @@ -204,7 +204,8 @@ func UpdateFullSnapshotLease(ctx context.Context, logger *logrus.Entry, fullSnap
return err
}

logger.Info(logString, " at time ", renewedTime)
logger.Info(logString, " at time ", time.Now())
logger.Info("Full snapshot lease's spec.renewTime is updated to the full snapshot taken time i.e ", fullSnapshotTime)
return nil
}); err != nil {
return &errors.EtcdError{
Expand Down Expand Up @@ -278,8 +279,8 @@ func UpdateDeltaSnapshotLease(ctx context.Context, logger *logrus.Entry, prevDel
}

// FullSnapshotCaseLeaseUpdate Updates the fullsnapshot lease as needed when a full snapshot is taken
func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string) error {
if err := UpdateFullSnapshotLease(ctx, logger, fullSnapshot, k8sClientset, fullSnapshotLeaseName); err != nil {
func FullSnapshotCaseLeaseUpdate(ctx context.Context, logger *logrus.Entry, fullSnapshot *brtypes.Snapshot, k8sClientset client.Client, fullSnapshotLeaseName string, fullSnapshotTime time.Time) error {
anveshreddy18 marked this conversation as resolved.
Show resolved Hide resolved
if err := UpdateFullSnapshotLease(ctx, logger, fullSnapshot, k8sClientset, fullSnapshotLeaseName, fullSnapshotTime); err != nil {
return &errors.EtcdError{
Message: fmt.Sprintf("Failed to update full snapshot lease: %v", err),
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/health/heartbeat/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ var _ = Describe("Heartbeat", func() {
Expect(err).ShouldNot(HaveOccurred())

// Update full snapshot lease with the first full snapshot
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, prevFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, prevFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName, time.Now())
Expect(err).ShouldNot(HaveOccurred())

l := &v1.Lease{}
Expand All @@ -131,7 +131,7 @@ var _ = Describe("Heartbeat", func() {
Expect(l.Spec.HolderIdentity).To(PointTo(Equal("980")))

// Trigger full snapshot lease update with latest full snapshot which is not the first full snapshot
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, latestFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
err = heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, latestFullSnap, k8sClientset, brtypes.DefaultFullSnapshotLeaseName, time.Now())
Expect(err).ShouldNot(HaveOccurred())

l = &v1.Lease{}
Expand All @@ -150,7 +150,7 @@ var _ = Describe("Heartbeat", func() {

Expect(k8sClientset.Create(context.TODO(), lease)).To(Succeed())

err := heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName)
err := heartbeat.UpdateFullSnapshotLease(context.TODO(), logger, nil, k8sClientset, brtypes.DefaultFullSnapshotLeaseName, time.Now())
Expect(err).Should(HaveOccurred())

err = k8sClientset.Delete(context.TODO(), lease)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/backuprestoreserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex
if b.config.HealthConfig.SnapshotLeaseRenewalEnabled {
leaseUpdatectx, cancel := context.WithTimeout(ctx, brtypes.LeaseUpdateTimeoutDuration)
defer cancel()
if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName); err != nil {
if err = heartbeat.FullSnapshotCaseLeaseUpdate(leaseUpdatectx, b.logger, snapshot, ssr.K8sClientset, b.config.HealthConfig.FullSnapshotLeaseName, snapshot.CreatedOn); err != nil {
b.logger.Warnf("Snapshot lease update failed : %v", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/snapshot/snapshotter/fullsnapshotleaseupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// RenewFullSnapshotLeasePeriodically has a timer and will periodically call FullSnapshotCaseLeaseUpdate to renew the fullsnapshot lease until it is updated or stopped.
// The timer starts upon snapshotter initialization and is reset after every full snapshot is taken.
func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh chan struct{}, fullSnapshotLeaseUpdateInterval time.Duration) {
func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh chan struct{}, fullSnapshotLeaseUpdateInterval time.Duration) {
logger := logrus.NewEntry(logrus.New()).WithField("actor", "FullSnapLeaseUpdater")
ssr.FullSnapshotLeaseUpdateTimer = time.NewTimer(fullSnapshotLeaseUpdateInterval)
fullSnapshotLeaseUpdateCtx, fullSnapshotLeaseUpdateCancel := context.WithCancel(context.TODO())
Expand All @@ -34,7 +34,7 @@ func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStop
if err := func() error {
ctx, cancel := context.WithTimeout(fullSnapshotLeaseUpdateCtx, brtypes.LeaseUpdateTimeoutDuration)
defer cancel()
return heartbeat.FullSnapshotCaseLeaseUpdate(ctx, logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.HealthConfig.FullSnapshotLeaseName)
return heartbeat.FullSnapshotCaseLeaseUpdate(ctx, logger, ssr.PrevFullSnapshot, ssr.K8sClientset, ssr.HealthConfig.FullSnapshotLeaseName, ssr.PrevFullSnapshot.CreatedOn)
}(); err != nil {
//FullSnapshot lease update failed. Retry after interval
logger.Warnf("FullSnapshot lease update failed with error: %v", err)
Expand All @@ -52,7 +52,7 @@ func (ssr *Snapshotter) RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStop
ssr.FullSnapshotLeaseUpdateTimer.Reset(fullSnapshotLeaseUpdateInterval)
}

case <-FullSnapshotLeaseStopCh:
case <-fullSnapshotLeaseStopCh:
logger.Info("Closing the full snapshot lease renewal")
return
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/snapshot/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func NewSnapshotter(logger *logrus.Entry, config *brtypes.SnapshotterConfig, sto
// Setting startWithFullSnapshot to false will start the snapshotter without
// taking the first full snapshot.
func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool) error {
FullSnapshotLeaseStopCh := make(chan struct{})
defer ssr.stop(FullSnapshotLeaseStopCh)
fullSnapshotLeaseStopCh := make(chan struct{})
defer ssr.stop(fullSnapshotLeaseStopCh)
if startWithFullSnapshot {
ssr.fullSnapshotTimer = time.NewTimer(0)
} else {
Expand All @@ -188,7 +188,7 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool)
}
}
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, brtypes.FullSnapshotLeaseUpdateInterval)
go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, brtypes.FullSnapshotLeaseUpdateInterval)
}
ssr.deltaSnapshotTimer = time.NewTimer(brtypes.DefaultDeltaSnapshotInterval)
if ssr.config.DeltaSnapshotPeriod.Duration >= brtypes.DeltaSnapshotIntervalThreshold {
Expand Down Expand Up @@ -234,7 +234,7 @@ func (ssr *Snapshotter) TriggerDeltaSnapshot() (*brtypes.Snapshot, error) {

// stop stops the snapshotter. Once stopped any subsequent calls will
// not have any effect.
func (ssr *Snapshotter) stop(FullSnapshotLeaseStopCh chan struct{}) {
func (ssr *Snapshotter) stop(fullSnapshotLeaseStopCh chan struct{}) {
ssr.logger.Info("Closing the Snapshotter...")

if ssr.fullSnapshotTimer != nil {
Expand All @@ -246,7 +246,7 @@ func (ssr *Snapshotter) stop(FullSnapshotLeaseStopCh chan struct{}) {
ssr.deltaSnapshotTimer = nil
}
if ssr.HealthConfig.SnapshotLeaseRenewalEnabled {
FullSnapshotLeaseStopCh <- emptyStruct
fullSnapshotLeaseStopCh <- emptyStruct
}
ssr.SetSnapshotterInactive()
ssr.closeEtcdClient()
Expand Down
16 changes: 8 additions & 8 deletions pkg/snapshot/snapshotter/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ var _ = Describe("Snapshotter", func() {
var (
ssr *Snapshotter
lease *v1.Lease
FullSnapshotLeaseStopCh chan struct{}
fullSnapshotLeaseStopCh chan struct{}
ctx context.Context
cancel context.CancelFunc
fullSnapshotLeaseUpdateInterval time.Duration
Expand Down Expand Up @@ -932,7 +932,7 @@ var _ = Describe("Snapshotter", func() {
ssr.PrevFullSnapshot = nil
ssr.K8sClientset = fake.NewClientBuilder().Build()
ssr.HealthConfig.SnapshotLeaseRenewalEnabled = true
FullSnapshotLeaseStopCh = make(chan struct{})
fullSnapshotLeaseStopCh = make(chan struct{})
})
AfterEach(func() {
Expect(os.Unsetenv("POD_NAME")).To(Succeed())
Expand All @@ -947,9 +947,9 @@ var _ = Describe("Snapshotter", func() {
Expect(err).ShouldNot(HaveOccurred())

fullSnapshotLeaseUpdateInterval = 2 * time.Second
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
time.Sleep(2 * time.Second)
close(FullSnapshotLeaseStopCh)
close(fullSnapshotLeaseStopCh)

l := &v1.Lease{}
Expect(ssr.K8sClientset.Get(ctx, client.ObjectKey{
Expand All @@ -976,9 +976,9 @@ var _ = Describe("Snapshotter", func() {
Expect(err).ShouldNot(HaveOccurred())

fullSnapshotLeaseUpdateInterval = time.Second
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
time.Sleep(2 * time.Second)
close(FullSnapshotLeaseStopCh)
close(fullSnapshotLeaseStopCh)

l := &v1.Lease{}
Expect(ssr.K8sClientset.Get(ctx, client.ObjectKey{
Expand All @@ -1000,7 +1000,7 @@ var _ = Describe("Snapshotter", func() {
prevFullSnap.GenerateSnapshotName()
ssr.PrevFullSnapshot = prevFullSnap
fullSnapshotLeaseUpdateInterval = 3 * time.Second
go ssr.RenewFullSnapshotLeasePeriodically(FullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
go ssr.RenewFullSnapshotLeasePeriodically(fullSnapshotLeaseStopCh, fullSnapshotLeaseUpdateInterval)
time.Sleep(time.Second)
err := ssr.K8sClientset.Create(ctx, lease)
Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -1018,7 +1018,7 @@ var _ = Describe("Snapshotter", func() {
Name: lease.Name,
}, l)).To(Succeed())
Expect(*l.Spec.HolderIdentity).To(Equal(strconv.FormatInt(prevFullSnap.LastRevision, 10)))
close(FullSnapshotLeaseStopCh)
close(fullSnapshotLeaseStopCh)
})
})
})
Expand Down