Skip to content

Commit

Permalink
Fix the ProbeEtcd timeout. (#532)
Browse files Browse the repository at this point in the history
* Fix the ProbeEtcd timeout.

* Improved the logging while closing the snapshotter.
  • Loading branch information
ishan16696 authored Sep 14, 2022
1 parent 0caea43 commit 9f00fdb
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 10 deletions.
15 changes: 9 additions & 6 deletions pkg/server/backuprestoreserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype
// set "http handler" with the latest snapshotter object
handler.SetSnapshotter(ssr)
defragCallBack = ssr.TriggerFullSnapshot
go handleSsrStopRequest(leCtx, handler, ssr, ackCh, ssrStopCh)
go handleSsrStopRequest(leCtx, handler, ssr, ackCh, ssrStopCh, b.logger)
}
go b.runEtcdProbeLoopWithSnapshotter(leCtx, handler, ssr, ss, ssrStopCh, ackCh)
go defragmentor.DefragDataPeriodically(leCtx, b.config.EtcdConnectionConfig, b.defragmentationSchedule, defragCallBack, b.logger)
Expand Down Expand Up @@ -363,7 +363,7 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex
err = b.probeEtcd(ctx)
}
if err != nil {
b.logger.Errorf("Failed to probe etcd: %v", err)
b.logger.Errorf("failed to probe etcd: %v", err)
handler.SetStatus(http.StatusServiceUnavailable)
continue
}
Expand Down Expand Up @@ -570,6 +570,7 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex
// probeEtcd will make the snapshotter probe for etcd endpoint to be available
// before it starts taking regular snapshots.
func (b *BackupRestoreServer) probeEtcd(ctx context.Context) error {
b.logger.Info("Probing Etcd...")
var endPoint string
client, err := etcdutil.NewFactory(*b.config.EtcdConnectionConfig).NewMaintenance()
if err != nil {
Expand All @@ -579,19 +580,20 @@ func (b *BackupRestoreServer) probeEtcd(ctx context.Context) error {
}
defer client.Close()

ctx, cancel := context.WithTimeout(ctx, b.config.EtcdConnectionConfig.ConnectionTimeout.Duration)
ctx, cancel := context.WithTimeout(ctx, brtypes.DefaultEtcdStatusConnecTimeout)
defer cancel()

if len(b.config.EtcdConnectionConfig.Endpoints) > 0 {
endPoint = b.config.EtcdConnectionConfig.Endpoints[0]
} else {
return fmt.Errorf("etcd endpoints are not passed correctly")
}
_, err = client.Status(ctx, endPoint)
if err != nil {

if _, err := client.Status(ctx, endPoint); err != nil {
b.logger.Errorf("failed to get status of etcd endPoint: %v with error: %v", endPoint, err)
return err
}

return nil
}

Expand All @@ -605,12 +607,13 @@ func handleAckState(handler *HTTPHandler, ackCh chan struct{}) {
}

// handleSsrStopRequest responds to handlers request and stop interrupt.
func handleSsrStopRequest(ctx context.Context, handler *HTTPHandler, ssr *snapshotter.Snapshotter, ackCh, ssrStopCh chan struct{}) {
func handleSsrStopRequest(ctx context.Context, handler *HTTPHandler, ssr *snapshotter.Snapshotter, ackCh, ssrStopCh chan struct{}, logger *logrus.Entry) {
for {
var ok bool
select {
case _, ok = <-handler.ReqCh:
case _, ok = <-ctx.Done():
logger.Info("Stopping handleSsrStopRequest...")
}

ssr.SsrStateMutex.Lock()
Expand Down
3 changes: 2 additions & 1 deletion pkg/snapshot/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (ssr *Snapshotter) TriggerDeltaSnapshot() (*brtypes.Snapshot, error) {
return nil, fmt.Errorf("snapshotter is not active")
}
if ssr.config.DeltaSnapshotPeriod.Duration < brtypes.DeltaSnapshotIntervalThreshold {
return nil, fmt.Errorf("Found delta snapshot interval %s less than %v. Delta snapshotting is disabled. ", ssr.config.DeltaSnapshotPeriod.Duration, time.Duration(brtypes.DeltaSnapshotIntervalThreshold))
return nil, fmt.Errorf("found delta snapshot interval %s less than %v. Delta snapshotting is disabled. ", ssr.config.DeltaSnapshotPeriod.Duration, time.Duration(brtypes.DeltaSnapshotIntervalThreshold))
}
ssr.logger.Info("Triggering out of schedule delta snapshot...")
ssr.deltaSnapshotReqCh <- emptyStruct
Expand All @@ -231,6 +231,7 @@ func (ssr *Snapshotter) TriggerDeltaSnapshot() (*brtypes.Snapshot, error) {
// stop stops the snapshotter. Once stopped any subsequent calls will
// not have any effect.
func (ssr *Snapshotter) stop() {
ssr.logger.Info("Closing the Snapshotter...")
ssr.SsrStateMutex.Lock()
if ssr.fullSnapshotTimer != nil {
ssr.fullSnapshotTimer.Stop()
Expand Down
6 changes: 3 additions & 3 deletions pkg/types/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
const (
// DefaultReelectionPeriod defines default time period for Reelection.
DefaultReelectionPeriod = 5 * time.Second
// DefaultEtcdConnecTimeout defines default ConnectionTimeout for etcd client.
DefaultEtcdConnecTimeout = 5 * time.Second
// DefaultEtcdStatusConnecTimeout defines default ConnectionTimeout for etcd client to get Etcd endpoint status.
DefaultEtcdStatusConnecTimeout = 5 * time.Second
)

// LeaderCallbacks are callbacks that are triggered to start/stop the snapshottter when leader's currentState changes.
Expand Down Expand Up @@ -67,7 +67,7 @@ type Config struct {
func NewLeaderElectionConfig() *Config {
return &Config{
ReelectionPeriod: wrappers.Duration{Duration: DefaultReelectionPeriod},
EtcdConnectionTimeout: wrappers.Duration{Duration: DefaultEtcdConnecTimeout},
EtcdConnectionTimeout: wrappers.Duration{Duration: DefaultEtcdStatusConnecTimeout},
}
}

Expand Down

0 comments on commit 9f00fdb

Please sign in to comment.