From 9f00fdb76c0c1edd77934c003f4ad71546ff8763 Mon Sep 17 00:00:00 2001 From: Ishan Tyagi <42602577+ishan16696@users.noreply.github.com> Date: Wed, 14 Sep 2022 19:04:31 +0530 Subject: [PATCH] Fix the ProbeEtcd timeout. (#532) * Fix the ProbeEtcd timeout. * Improved the logging while closing the snapshotter. --- pkg/server/backuprestoreserver.go | 15 +++++++++------ pkg/snapshot/snapshotter/snapshotter.go | 3 ++- pkg/types/leaderelection.go | 6 +++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/pkg/server/backuprestoreserver.go b/pkg/server/backuprestoreserver.go index e0ba2014d..7950b3987 100644 --- a/pkg/server/backuprestoreserver.go +++ b/pkg/server/backuprestoreserver.go @@ -264,7 +264,7 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype // set "http handler" with the latest snapshotter object handler.SetSnapshotter(ssr) defragCallBack = ssr.TriggerFullSnapshot - go handleSsrStopRequest(leCtx, handler, ssr, ackCh, ssrStopCh) + go handleSsrStopRequest(leCtx, handler, ssr, ackCh, ssrStopCh, b.logger) } go b.runEtcdProbeLoopWithSnapshotter(leCtx, handler, ssr, ss, ssrStopCh, ackCh) go defragmentor.DefragDataPeriodically(leCtx, b.config.EtcdConnectionConfig, b.defragmentationSchedule, defragCallBack, b.logger) @@ -363,7 +363,7 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex err = b.probeEtcd(ctx) } if err != nil { - b.logger.Errorf("Failed to probe etcd: %v", err) + b.logger.Errorf("failed to probe etcd: %v", err) handler.SetStatus(http.StatusServiceUnavailable) continue } @@ -570,6 +570,7 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex // probeEtcd will make the snapshotter probe for etcd endpoint to be available // before it starts taking regular snapshots. func (b *BackupRestoreServer) probeEtcd(ctx context.Context) error { + b.logger.Info("Probing Etcd...") var endPoint string client, err := etcdutil.NewFactory(*b.config.EtcdConnectionConfig).NewMaintenance() if err != nil { @@ -579,7 +580,7 @@ func (b *BackupRestoreServer) probeEtcd(ctx context.Context) error { } defer client.Close() - ctx, cancel := context.WithTimeout(ctx, b.config.EtcdConnectionConfig.ConnectionTimeout.Duration) + ctx, cancel := context.WithTimeout(ctx, brtypes.DefaultEtcdStatusConnecTimeout) defer cancel() if len(b.config.EtcdConnectionConfig.Endpoints) > 0 { @@ -587,11 +588,12 @@ func (b *BackupRestoreServer) probeEtcd(ctx context.Context) error { } else { return fmt.Errorf("etcd endpoints are not passed correctly") } - _, err = client.Status(ctx, endPoint) - if err != nil { + + if _, err := client.Status(ctx, endPoint); err != nil { b.logger.Errorf("failed to get status of etcd endPoint: %v with error: %v", endPoint, err) return err } + return nil } @@ -605,12 +607,13 @@ func handleAckState(handler *HTTPHandler, ackCh chan struct{}) { } // handleSsrStopRequest responds to handlers request and stop interrupt. -func handleSsrStopRequest(ctx context.Context, handler *HTTPHandler, ssr *snapshotter.Snapshotter, ackCh, ssrStopCh chan struct{}) { +func handleSsrStopRequest(ctx context.Context, handler *HTTPHandler, ssr *snapshotter.Snapshotter, ackCh, ssrStopCh chan struct{}, logger *logrus.Entry) { for { var ok bool select { case _, ok = <-handler.ReqCh: case _, ok = <-ctx.Done(): + logger.Info("Stopping handleSsrStopRequest...") } ssr.SsrStateMutex.Lock() diff --git a/pkg/snapshot/snapshotter/snapshotter.go b/pkg/snapshot/snapshotter/snapshotter.go index 37880b72b..8cef2301d 100644 --- a/pkg/snapshot/snapshotter/snapshotter.go +++ b/pkg/snapshot/snapshotter/snapshotter.go @@ -220,7 +220,7 @@ func (ssr *Snapshotter) TriggerDeltaSnapshot() (*brtypes.Snapshot, error) { return nil, fmt.Errorf("snapshotter is not active") } if ssr.config.DeltaSnapshotPeriod.Duration < brtypes.DeltaSnapshotIntervalThreshold { - return nil, fmt.Errorf("Found delta snapshot interval %s less than %v. Delta snapshotting is disabled. ", ssr.config.DeltaSnapshotPeriod.Duration, time.Duration(brtypes.DeltaSnapshotIntervalThreshold)) + return nil, fmt.Errorf("found delta snapshot interval %s less than %v. Delta snapshotting is disabled. ", ssr.config.DeltaSnapshotPeriod.Duration, time.Duration(brtypes.DeltaSnapshotIntervalThreshold)) } ssr.logger.Info("Triggering out of schedule delta snapshot...") ssr.deltaSnapshotReqCh <- emptyStruct @@ -231,6 +231,7 @@ func (ssr *Snapshotter) TriggerDeltaSnapshot() (*brtypes.Snapshot, error) { // stop stops the snapshotter. Once stopped any subsequent calls will // not have any effect. func (ssr *Snapshotter) stop() { + ssr.logger.Info("Closing the Snapshotter...") ssr.SsrStateMutex.Lock() if ssr.fullSnapshotTimer != nil { ssr.fullSnapshotTimer.Stop() diff --git a/pkg/types/leaderelection.go b/pkg/types/leaderelection.go index f4438c787..bb5511314 100644 --- a/pkg/types/leaderelection.go +++ b/pkg/types/leaderelection.go @@ -27,8 +27,8 @@ import ( const ( // DefaultReelectionPeriod defines default time period for Reelection. DefaultReelectionPeriod = 5 * time.Second - // DefaultEtcdConnecTimeout defines default ConnectionTimeout for etcd client. - DefaultEtcdConnecTimeout = 5 * time.Second + // DefaultEtcdStatusConnecTimeout defines default ConnectionTimeout for etcd client to get Etcd endpoint status. + DefaultEtcdStatusConnecTimeout = 5 * time.Second ) // LeaderCallbacks are callbacks that are triggered to start/stop the snapshottter when leader's currentState changes. @@ -67,7 +67,7 @@ type Config struct { func NewLeaderElectionConfig() *Config { return &Config{ ReelectionPeriod: wrappers.Duration{Duration: DefaultReelectionPeriod}, - EtcdConnectionTimeout: wrappers.Duration{Duration: DefaultEtcdConnecTimeout}, + EtcdConnectionTimeout: wrappers.Duration{Duration: DefaultEtcdStatusConnecTimeout}, } }