diff --git a/chart/etcd-backup-restore/templates/etcd-statefulset.yaml b/chart/etcd-backup-restore/templates/etcd-statefulset.yaml index 25a3a9cfc..760a50286 100644 --- a/chart/etcd-backup-restore/templates/etcd-statefulset.yaml +++ b/chart/etcd-backup-restore/templates/etcd-statefulset.yaml @@ -110,7 +110,7 @@ spec: - --defragmentation-schedule={{ .Values.backup.defragmentationSchedule }} {{- end }} - --etcd-connection-timeout={{ .Values.backup.etcdConnectionTimeout }} - - --delta-snapshot-period-seconds={{ int $.Values.backup.deltaSnapshotPeriodSeconds }} + - --delta-snapshot-period={{ int $.Values.backup.deltaSnapshotPeriod }} - --delta-snapshot-memory-limit={{ int $.Values.backup.deltaSnapshotMemoryLimit }} {{- if and .Values.etcdAuth.username .Values.etcdAuth.password }} - --etcd-username={{ .Values.etcdAuth.username }} diff --git a/chart/etcd-backup-restore/values.yaml b/chart/etcd-backup-restore/values.yaml index b53fc3fa7..8d915f528 100644 --- a/chart/etcd-backup-restore/values.yaml +++ b/chart/etcd-backup-restore/values.yaml @@ -40,8 +40,8 @@ backup: # schedule is cron standard schedule to take full snapshots. schedule: "0 */1 * * *" - # deltaSnapshotPeriodSeconds is Period in seconds after which delta snapshot will be persisted. If this value is set to be lesser than 1, delta snapshotting will be disabled. - deltaSnapshotPeriodSeconds: 60 + # deltaSnapshotPeriod is Period after which delta snapshot will be persisted. If this value is set to be lesser than 1 second, delta snapshotting will be disabled. + deltaSnapshotPeriod: "60s" # deltaSnapshotMemoryLimit is memory limit in bytes after which delta snapshots will be taken out of schedule. deltaSnapshotMemoryLimit: 104857600 #100MB @@ -53,7 +53,7 @@ backup: # maxBackups is the maximum number of backups to keep (may change in future). This is interpreted in case of garbageCollectionPolicy set to LimitBased. maxBackups: 7 - etcdConnectionTimeout: 300 + etcdConnectionTimeout: "30s" # etcdQuotaBytes used to Raise alarms when backend DB size exceeds the given quota bytes etcdQuotaBytes: 8589934592 #8GB diff --git a/cmd/server.go b/cmd/server.go index 3ee9cba38..518e30c0b 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -44,6 +44,7 @@ func NewServerCommand(ctx context.Context) *cobra.Command { Short: "start the http server with backup scheduler.", Long: `Server will keep listening for http request to deliver its functionality through http endpoints.`, Run: func(cmd *cobra.Command, args []string) { + printVersionInfo() var ( snapstoreConfig *snapstore.Config @@ -111,10 +112,10 @@ func NewServerCommand(ctx context.Context) *cobra.Command { fullSnapshotSchedule, ss, maxBackups, - deltaSnapshotIntervalSeconds, deltaSnapshotMemoryLimit, - time.Duration(etcdConnectionTimeout), - time.Duration(garbageCollectionPeriodSeconds), + deltaSnapshotInterval, + etcdConnectionTimeout, + garbageCollectionPeriod, garbageCollectionPolicy, tlsConfig) if err != nil { @@ -139,7 +140,7 @@ func NewServerCommand(ctx context.Context) *cobra.Command { logger.Fatalf("failed to parse defragmentation schedule: %v", err) return } - go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot, logrus.NewEntry(logger)) + go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, etcdConnectionTimeout, ssr.TriggerFullSnapshot, logrus.NewEntry(logger)) runEtcdProbeLoopWithSnapshotter(tlsConfig, handler, ssr, ssrStopCh, ctx.Done(), ackCh) return @@ -156,7 +157,7 @@ func NewServerCommand(ctx context.Context) *cobra.Command { logger.Fatalf("failed to parse defragmentation schedule: %v", err) return } - go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, time.Duration(etcdConnectionTimeout)*time.Second, nil, logrus.NewEntry(logger)) + go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, etcdConnectionTimeout, nil, logrus.NewEntry(logger)) runEtcdProbeLoopWithoutSnapshotter(tlsConfig, handler, ctx.Done(), ackCh) }, @@ -327,7 +328,7 @@ func ProbeEtcd(tlsConfig *etcdutil.TLSConfig) error { } } - ctx, cancel := context.WithTimeout(context.TODO(), time.Duration(etcdConnectionTimeout)*time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), etcdConnectionTimeout) defer cancel() if _, err := client.Get(ctx, "foo"); err != nil { logger.Errorf("Failed to connect to client: %v", err) diff --git a/cmd/snapshot.go b/cmd/snapshot.go index 09c57ac1f..7a22176dd 100644 --- a/cmd/snapshot.go +++ b/cmd/snapshot.go @@ -61,10 +61,10 @@ storing snapshots on various cloud storage providers as well as local disk locat fullSnapshotSchedule, ss, maxBackups, - deltaSnapshotIntervalSeconds, deltaSnapshotMemoryLimit, - time.Duration(etcdConnectionTimeout), - time.Duration(garbageCollectionPeriodSeconds), + deltaSnapshotInterval, + etcdConnectionTimeout, + garbageCollectionPeriod, garbageCollectionPolicy, tlsConfig) if err != nil { @@ -79,7 +79,7 @@ storing snapshots on various cloud storage providers as well as local disk locat logger.Fatalf("failed to parse defragmentation schedule: %v", err) return } - go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot, logrus.NewEntry(logger)) + go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, etcdConnectionTimeout, ssr.TriggerFullSnapshot, logrus.NewEntry(logger)) go ssr.RunGarbageCollector(ctx.Done()) if err := ssr.Run(ctx.Done(), true); err != nil { @@ -98,11 +98,11 @@ storing snapshots on various cloud storage providers as well as local disk locat func initializeSnapshotterFlags(cmd *cobra.Command) { cmd.Flags().StringSliceVarP(&etcdEndpoints, "endpoints", "e", []string{"127.0.0.1:2379"}, "comma separated list of etcd endpoints") cmd.Flags().StringVarP(&fullSnapshotSchedule, "schedule", "s", "* */1 * * *", "schedule for snapshots") - cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-period-seconds", "i", snapshotter.DefaultDeltaSnapshotIntervalSeconds, "Period in seconds after which delta snapshot will be persisted. If this value is set to be lesser than 1, delta snapshotting will be disabled.") + cmd.Flags().DurationVarP(&deltaSnapshotInterval, "delta-snapshot-period", "i", snapshotter.DefaultDeltaSnapshotInterval, "Period after which delta snapshot will be persisted. If this value is set to be lesser than 1 seconds, delta snapshotting will be disabled.") cmd.Flags().IntVar(&deltaSnapshotMemoryLimit, "delta-snapshot-memory-limit", snapshotter.DefaultDeltaSnapMemoryLimit, "memory limit after which delta snapshots will be taken") cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", snapshotter.DefaultMaxBackups, "maximum number of previous backups to keep") - cmd.Flags().IntVar(&etcdConnectionTimeout, "etcd-connection-timeout", 30, "etcd client connection timeout") - cmd.Flags().IntVar(&garbageCollectionPeriodSeconds, "garbage-collection-period-seconds", 60, "Period in seconds for garbage collecting old backups") + cmd.Flags().DurationVar(&etcdConnectionTimeout, "etcd-connection-timeout", 30*time.Second, "etcd client connection timeout") + cmd.Flags().DurationVar(&garbageCollectionPeriod, "garbage-collection-period", 60*time.Second, "Period for garbage collecting old backups") cmd.Flags().StringVar(&garbageCollectionPolicy, "garbage-collection-policy", snapshotter.GarbageCollectionPolicyExponential, "Policy for garbage collecting old backups") cmd.Flags().BoolVar(&insecureTransport, "insecure-transport", true, "disable transport security for client connections") cmd.Flags().BoolVar(&insecureSkipVerify, "insecure-skip-tls-verify", false, "skip server certificate verification") diff --git a/cmd/types.go b/cmd/types.go index c7acee004..f55461352 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -15,6 +15,8 @@ package cmd import ( + "time" + "github.com/sirupsen/logrus" ) @@ -29,22 +31,22 @@ var ( logger = logrus.New() version bool //snapshotter flags - fullSnapshotSchedule string - etcdEndpoints []string - etcdUsername string - etcdPassword string - deltaSnapshotIntervalSeconds int - deltaSnapshotMemoryLimit int - maxBackups int - etcdConnectionTimeout int - garbageCollectionPeriodSeconds int - garbageCollectionPolicy string - insecureTransport bool - insecureSkipVerify bool - certFile string - keyFile string - caFile string - defragmentationSchedule string + fullSnapshotSchedule string + etcdEndpoints []string + etcdUsername string + etcdPassword string + deltaSnapshotMemoryLimit int + deltaSnapshotInterval time.Duration + etcdConnectionTimeout time.Duration + garbageCollectionPeriod time.Duration + garbageCollectionPolicy string + maxBackups int + insecureTransport bool + insecureSkipVerify bool + certFile string + keyFile string + caFile string + defragmentationSchedule string //server flags port int diff --git a/doc/usage/getting_started.md b/doc/usage/getting_started.md index a408d50a7..3788059db 100644 --- a/doc/usage/getting_started.md +++ b/doc/usage/getting_started.md @@ -22,7 +22,7 @@ The procedure to provide credentials to access the cloud provider object store v ### Taking scheduled snapshot -Sub-command `snapshot` takes scheduled backups, or `snapshots` of a running `etcd` cluster, which are pushed to one of the storage providers specified above (please note that `etcd` should already be running). One can apply standard cron format scheduling for regular backup of etcd. The cron schedule is used to take full backups. The delta snapshots are taken at regular intervals in the period in between full snapshots as indicated by the `delta-snapshot-period-seconds` flag. The default for the same is 10 seconds. +Sub-command `snapshot` takes scheduled backups, or `snapshots` of a running `etcd` cluster, which are pushed to one of the storage providers specified above (please note that `etcd` should already be running). One can apply standard cron format scheduling for regular backup of etcd. The cron schedule is used to take full backups. The delta snapshots are taken at regular intervals in the period in between full snapshots as indicated by the `delta-snapshot-period` flag. The default for the same is 20 seconds. etcd-backup-restore has two garbage collection policies to clean up existing backups from the cloud bucket. The flag `garbage-collection-policy` is used to indicate the desired garbage collection policy. @@ -37,7 +37,7 @@ $ ./bin/etcdbrctl snapshot \ --etcd-endpoints http://localhost:2379 \ --schedule "*/1 * * * *" \ --store-container="etcd-backup" \ ---delta-snapshot-period-seconds=10 \ +--delta-snapshot-period=10s \ --max-backups=10 \ --garbage-collection-policy='LimitBased' @@ -89,7 +89,7 @@ $ ./bin/etcdbrctl snapshot \ --endpoints http://localhost:2379 \ --schedule "*/1 * * * *" \ --store-container="etcd-backup" \ ---delta-snapshot-period-seconds=10 \ +--delta-snapshot-period=10s \ --garbage-collection-policy='Exponential' INFO[0000] etcd-backup-restore Version: 0.7.0-dev diff --git a/doc/usage/metrics.md b/doc/usage/metrics.md index ac33636e3..e2fcc6616 100644 --- a/doc/usage/metrics.md +++ b/doc/usage/metrics.md @@ -30,7 +30,7 @@ Abnormally high snapshot duration (`etcdbr_snapshot_duration_seconds`) indicates `etcdbr_snapshot_gc_total` gives the total number of snapshots garbage collected since bootstrap. You can use this in coordination with `etcdbr_snapshot_duration_seconds_count` to get number of snapshots in object store. -`etcdbr_snapshot_required` indicates whether a new snapshot is required to be taken. Acts as a boolean flag where zero value implies 'false' and non-zero values imply 'true'. :warning: This metric does not work as expected for the case where delta snapshots are disabled (by setting the etcdbrctl flag `delta-snapshot-period-seconds` to 0). +`etcdbr_snapshot_required` indicates whether a new snapshot is required to be taken. Acts as a boolean flag where zero value implies 'false' and non-zero values imply 'true'. :warning: This metric does not work as expected for the case where delta snapshots are disabled (by setting the etcdbrctl flag `delta-snapshot-period` to 0). ### Defragmentation diff --git a/pkg/etcdutil/defrag_test.go b/pkg/etcdutil/defrag_test.go index 33cfb9569..733a3333d 100644 --- a/pkg/etcdutil/defrag_test.go +++ b/pkg/etcdutil/defrag_test.go @@ -28,19 +28,21 @@ import ( var _ = Describe("Defrag", func() { var ( tlsConfig *TLSConfig - endpoints = []string{"http://localhost:2379"} - etcdConnectionTimeout = time.Duration(30 * time.Second) + etcdConnectionTimeout = 30 * time.Second keyPrefix = "/defrag/key-" valuePrefix = "val" etcdUsername string etcdPassword string ) - tlsConfig = NewTLSConfig("", "", "", true, true, endpoints, etcdUsername, etcdPassword) + BeforeEach(func() { + tlsConfig = NewTLSConfig("", "", "", true, true, endpoints, etcdUsername, etcdPassword) + }) Context("Defragmentation", func() { BeforeEach(func() { now := time.Now().Unix() client, err := GetTLSClientForEtcd(tlsConfig) defer client.Close() + logger.Infof("TLSConfig %v, Endpoint %v", tlsConfig, endpoints) Expect(err).ShouldNot(HaveOccurred()) for index := 0; index <= 1000; index++ { ctx, cancel := context.WithTimeout(testCtx, etcdConnectionTimeout) @@ -78,7 +80,7 @@ var _ = Describe("Defrag", func() { }) It("should keep size of DB same in case of timeout", func() { - etcdConnectionTimeout = time.Duration(time.Second) + etcdConnectionTimeout = time.Second client, err := GetTLSClientForEtcd(tlsConfig) Expect(err).ShouldNot(HaveOccurred()) defer client.Close() @@ -88,7 +90,7 @@ var _ = Describe("Defrag", func() { Expect(err).ShouldNot(HaveOccurred()) oldRevision := oldStatus.Header.GetRevision() - defragmentorJob := NewDefragmentorJob(testCtx, tlsConfig, time.Duration(time.Microsecond), logger, nil) + defragmentorJob := NewDefragmentorJob(testCtx, tlsConfig, time.Microsecond, logger, nil) defragmentorJob.Run() ctx, cancel = context.WithTimeout(testCtx, etcdDialTimeout) diff --git a/pkg/etcdutil/etcdutil_suite_test.go b/pkg/etcdutil/etcdutil_suite_test.go index 34089cac2..28fe6d8b2 100644 --- a/pkg/etcdutil/etcdutil_suite_test.go +++ b/pkg/etcdutil/etcdutil_suite_test.go @@ -31,15 +31,15 @@ import ( const ( outputDir = "../../test/output" etcdDir = outputDir + "/default.etcd" - etcdEndpoint = "http://localhost:2379" etcdDialTimeout = time.Second * 30 ) var ( - testCtx = context.Background() - logger = logrus.New().WithField("suite", "etcdutil") - etcd *embed.Etcd - err error + testCtx = context.Background() + logger = logrus.New().WithField("suite", "etcdutil") + etcd *embed.Etcd + endpoints []string + err error ) func TestEtcdutil(t *testing.T) { @@ -48,11 +48,14 @@ func TestEtcdutil(t *testing.T) { } var _ = SynchronizedBeforeSuite(func() []byte { + logger.Logger.Out = GinkgoWriter err = os.RemoveAll(outputDir) Expect(err).ShouldNot(HaveOccurred()) etcd, err = utils.StartEmbeddedEtcd(testCtx, etcdDir, logger) Expect(err).ShouldNot(HaveOccurred()) + endpoints = []string{etcd.Clients[0].Addr().String()} + logger.Infof("endpoints: %s", endpoints) var data []byte return data }, func(data []byte) {}) @@ -60,5 +63,4 @@ var _ = SynchronizedBeforeSuite(func() []byte { var _ = SynchronizedAfterSuite(func() {}, func() { etcd.Server.Stop() etcd.Close() - os.RemoveAll(outputDir) }) diff --git a/pkg/initializer/validator/datavalidator_test.go b/pkg/initializer/validator/datavalidator_test.go index c0a1d7ec6..1f3fcea98 100644 --- a/pkg/initializer/validator/datavalidator_test.go +++ b/pkg/initializer/validator/datavalidator_test.go @@ -39,6 +39,7 @@ var _ = Describe("Running Datavalidator", func() { Logger: logger.Logger, } }) + Context("with missing data directory", func() { It("should return DataDirStatus as DataDirectoryNotExist, and non-nil error", func() { tempDir := fmt.Sprintf("%s.%s", restoreDataDir, "temp") @@ -189,8 +190,9 @@ var _ = Describe("Running Datavalidator", func() { }() // start etcd - etcd, err = utils.StartEmbeddedEtcd(testCtx, restoreDataDir, logger) + etcd, err := utils.StartEmbeddedEtcd(testCtx, restoreDataDir, logger) Expect(err).ShouldNot(HaveOccurred()) + endpoints := []string{etcd.Clients[0].Addr().String()} // populate etcd but with lesser data than previous populate call, so that the new db has a lower revision resp := &utils.EtcdDataPopulationResponse{} utils.PopulateEtcd(testCtx, logger, endpoints, 0, int(keyTo/2), resp) diff --git a/pkg/initializer/validator/validator_suite_test.go b/pkg/initializer/validator/validator_suite_test.go index 63b2b281d..521440bc0 100644 --- a/pkg/initializer/validator/validator_suite_test.go +++ b/pkg/initializer/validator/validator_suite_test.go @@ -4,9 +4,9 @@ import ( "context" "fmt" "io/ioutil" - "math" "os" "path" + "sync" "testing" "time" @@ -25,7 +25,6 @@ const ( outputDir = "../../../test/output" etcdDir = outputDir + "/default.etcd" snapstoreDir = outputDir + "/snapshotter.bkp" - etcdEndpoint = "http://localhost:2379" ) var ( @@ -34,7 +33,7 @@ var ( etcd *embed.Etcd err error keyTo int - endpoints = []string{etcdEndpoint} + endpoints []string etcdRevision int64 ) @@ -63,19 +62,20 @@ var _ = SynchronizedBeforeSuite(func() []byte { etcd.Server.Stop() etcd.Close() }() + endpoints = []string{etcd.Clients[0].Addr().String()} - populatorTimeout := time.Duration(15 * time.Second) - populatorCtx, cancelPopulator := context.WithTimeout(testCtx, populatorTimeout) - resp := &utils.EtcdDataPopulationResponse{} - go utils.PopulateEtcd(populatorCtx, logger, endpoints, 0, math.MaxInt64, resp) + populatorCtx, cancelPopulator := context.WithTimeout(testCtx, time.Duration(15*time.Second)) defer cancelPopulator() + resp := &utils.EtcdDataPopulationResponse{} + wg := &sync.WaitGroup{} + wg.Add(1) + go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, resp) - deltaSnapshotPeriod := 5 - snapshotterTimeout := populatorTimeout + time.Duration(deltaSnapshotPeriod+2)*time.Second - ctx, cancel := context.WithTimeout(testCtx, snapshotterTimeout) - defer cancel() + deltaSnapshotPeriod := 5 * time.Second + ctx := utils.ContextWithWaitGroupFollwedByGracePeriod(populatorCtx, wg, deltaSnapshotPeriod+2*time.Second) err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done()) Expect(err).ShouldNot(HaveOccurred()) + keyTo = resp.KeyTo etcdRevision = resp.EndRevision @@ -85,27 +85,23 @@ var _ = SynchronizedBeforeSuite(func() []byte { return data }, func(data []byte) {}) -var _ = SynchronizedAfterSuite(func() {}, func() { - os.RemoveAll(outputDir) -}) - // runSnapshotter creates a snapshotter object and runs it for a duration specified by 'snapshotterDurationSeconds' -func runSnapshotter(logger *logrus.Entry, deltaSnapshotPeriod int, endpoints []string, stopCh chan struct{}) error { +func runSnapshotter(logger *logrus.Entry, deltaSnapshotPeriod time.Duration, endpoints []string, stopCh <-chan struct{}) error { var ( - store snapstore.SnapStore - certFile string - keyFile string - caFile string - insecureTransport bool - insecureSkipVerify bool - maxBackups = 1 - deltaSnapshotMemoryLimit = 10 * 1024 * 1024 //10Mib - etcdConnectionTimeout = time.Duration(10) - garbageCollectionPeriodSeconds = time.Duration(60) - schedule = "0 0 1 1 *" - garbageCollectionPolicy = snapshotter.GarbageCollectionPolicyLimitBased - etcdUsername string - etcdPassword string + store snapstore.SnapStore + certFile string + keyFile string + caFile string + insecureTransport = true + insecureSkipVerify = true + maxBackups = 1 + deltaSnapshotMemoryLimit = 10 * 1024 * 1024 //10Mib + etcdConnectionTimeout = 10 * time.Second + garbageCollectionPeriod = 60 * time.Second + schedule = "0 0 1 1 *" + garbageCollectionPolicy = snapshotter.GarbageCollectionPolicyLimitBased + etcdUsername string + etcdPassword string ) store, err = snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) @@ -124,14 +120,15 @@ func runSnapshotter(logger *logrus.Entry, deltaSnapshotPeriod int, endpoints []s etcdPassword, ) + logger.Infof("tlsconfig %v", tlsConfig) snapshotterConfig, err := snapshotter.NewSnapshotterConfig( schedule, store, maxBackups, - deltaSnapshotPeriod, deltaSnapshotMemoryLimit, + deltaSnapshotPeriod, etcdConnectionTimeout, - garbageCollectionPeriodSeconds, + garbageCollectionPeriod, garbageCollectionPolicy, tlsConfig, ) diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index 548e34980..61e1caa3d 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -311,7 +311,7 @@ func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster, restoreN } // startEmbeddedEtcd starts the embedded etcd server. -func startEmbeddedEtcd(logger *logrus.Logger, ro RestoreOptions) (*embed.Etcd, error) { +func startEmbeddedEtcd(logger *logrus.Entry, ro RestoreOptions) (*embed.Etcd, error) { cfg := embed.NewConfig() cfg.Dir = filepath.Join(ro.RestoreDataDir) DefaultListenPeerURLs := "http://localhost:0" diff --git a/pkg/snapshot/restorer/restorer_suite_test.go b/pkg/snapshot/restorer/restorer_suite_test.go index 0e9339c72..bc43a5665 100644 --- a/pkg/snapshot/restorer/restorer_suite_test.go +++ b/pkg/snapshot/restorer/restorer_suite_test.go @@ -36,7 +36,6 @@ const ( outputDir = "../../../test/output" etcdDir = outputDir + "/default.etcd" snapstoreDir = outputDir + "/snapshotter.bkp" - etcdEndpoint = "http://localhost:2379" ) var ( @@ -45,7 +44,7 @@ var ( etcd *embed.Etcd err error keyTo int - endpoints = []string{etcdEndpoint} + endpoints []string ) func TestRestorer(t *testing.T) { @@ -67,15 +66,17 @@ var _ = SynchronizedBeforeSuite(func() []byte { etcd.Server.Stop() etcd.Close() }() - + endpoints = []string{etcd.Clients[0].Addr().String()} + logger.Infof("endpoints: %s", endpoints) populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 15*time.Second) defer cancelPopulator() resp := &utils.EtcdDataPopulationResponse{} wg := &sync.WaitGroup{} wg.Add(1) go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, resp) - deltaSnapshotPeriod := 1 - ctx := utils.ContextWithWaitGroupFollwedByGracePeriod(populatorCtx, wg, time.Duration(deltaSnapshotPeriod+2)*time.Second) + + deltaSnapshotPeriod := time.Second + ctx := utils.ContextWithWaitGroupFollwedByGracePeriod(populatorCtx, wg, deltaSnapshotPeriod+2*time.Second) err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), true) Expect(err).ShouldNot(HaveOccurred()) @@ -101,21 +102,21 @@ func cleanUp() { } // runSnapshotter creates a snapshotter object and runs it for a duration specified by 'snapshotterDurationSeconds' -func runSnapshotter(logger *logrus.Entry, deltaSnapshotPeriod int, endpoints []string, stopCh <-chan struct{}, startWithFullSnapshot bool) error { +func runSnapshotter(logger *logrus.Entry, deltaSnapshotPeriod time.Duration, endpoints []string, stopCh <-chan struct{}, startWithFullSnapshot bool) error { var ( - store snapstore.SnapStore - certFile string - keyFile string - caFile string - insecureTransport bool - insecureSkipVerify bool - maxBackups = 1 - etcdConnectionTimeout = time.Duration(10) - garbageCollectionPeriodSeconds = time.Duration(60) - schedule = "0 0 1 1 *" - garbageCollectionPolicy = snapshotter.GarbageCollectionPolicyLimitBased - etcdUsername string - etcdPassword string + store snapstore.SnapStore + certFile string + keyFile string + caFile string + insecureTransport = true + insecureSkipVerify = true + maxBackups = 1 + etcdConnectionTimeout = 10 * time.Second + garbageCollectionPeriod = 60 * time.Second + schedule = "0 0 1 1 *" + garbageCollectionPolicy = snapshotter.GarbageCollectionPolicyLimitBased + etcdUsername string + etcdPassword string ) store, err = snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) @@ -138,10 +139,10 @@ func runSnapshotter(logger *logrus.Entry, deltaSnapshotPeriod int, endpoints []s schedule, store, maxBackups, - deltaSnapshotPeriod, snapshotter.DefaultDeltaSnapMemoryLimit, + deltaSnapshotPeriod, etcdConnectionTimeout, - garbageCollectionPeriodSeconds, + garbageCollectionPeriod, garbageCollectionPolicy, tlsConfig, ) diff --git a/pkg/snapshot/restorer/restorer_test.go b/pkg/snapshot/restorer/restorer_test.go index 649a68dab..e6c5e08ac 100644 --- a/pkg/snapshot/restorer/restorer_test.go +++ b/pkg/snapshot/restorer/restorer_test.go @@ -58,7 +58,7 @@ var _ = Describe("Running Restorer", func() { ) BeforeEach(func() { - + wg = &sync.WaitGroup{} restoreDataDir = etcdDir restoreClusterToken = "etcd-cluster" restoreName = "default" @@ -74,7 +74,7 @@ var _ = Describe("Running Restorer", func() { }) - PDescribe("For pre-loaded Snapstore", func() { + Describe("For pre-loaded Snapstore", func() { BeforeEach(func() { err = corruptEtcdDir() Expect(err).ShouldNot(HaveOccurred()) @@ -200,9 +200,10 @@ var _ = Describe("Running Restorer", func() { }) Context("with maximum of one fetcher allowed", func() { - It("should restore etcd data directory", func() { - maxFetchers = 1 + const maxFetchers = 1 + It("should restore etcd data directory", func() { + logger.Infof("Testing for max-fetchers: %d", maxFetchers) restoreOptions := RestoreOptions{ ClusterURLs: clusterUrlsMap, ClusterToken: restoreClusterToken, @@ -224,10 +225,10 @@ var _ = Describe("Running Restorer", func() { }) Context("with maximum of four fetchers allowed", func() { - fmt.Println("Testing for max-fetchers=4") - It("should restore etcd data directory", func() { - maxFetchers = 4 + const maxFetchers = 4 + It("should restore etcd data directory", func() { + logger.Infof("Testing for max-fetchers: %d", maxFetchers) restoreOptions := RestoreOptions{ ClusterURLs: clusterUrlsMap, ClusterToken: restoreClusterToken, @@ -249,10 +250,10 @@ var _ = Describe("Running Restorer", func() { }) Context("with maximum of hundred fetchers allowed", func() { - fmt.Println("Testing for max-fetchers=100") - It("should restore etcd data directory", func() { - maxFetchers = 100 + const maxFetchers = 100 + It("should restore etcd data directory", func() { + logger.Infof("Testing for max-fetchers: %d", maxFetchers) restoreOptions := RestoreOptions{ ClusterURLs: clusterUrlsMap, ClusterToken: restoreClusterToken, @@ -275,40 +276,51 @@ var _ = Describe("Running Restorer", func() { }) Describe("NEGATIVE:For Dynamic Loads and Negative Scenarios", func() { + var ( + store snapstore.SnapStore + deltaSnapshotPeriod time.Duration + endpoints []string + ) + BeforeEach(func() { + deltaSnapshotPeriod = time.Second etcd, err = utils.StartEmbeddedEtcd(testCtx, etcdDir, logger) Expect(err).ShouldNot(HaveOccurred()) - wg = &sync.WaitGroup{} + endpoints = []string{etcd.Clients[0].Addr().String()} + store, err = snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + AfterEach(func() { + etcd.Close() + cleanUp() }) - AfterEach(cleanUp) + Context("with only delta snapshots and no full snapshots", func() { + var ( + startWithFullSnapshot = false + ) - PContext("with only delta snapshots and no full snapshots", func() { It("should restore from the delta snapshots ", func() { - cleanUp() - deltaSnapshotPeriod := 1 wg.Add(1) populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 2) - defer cancelPopulator() go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, nil) - + defer cancelPopulator() logger.Infoln("Starting snapshotter with basesnapshot set to false") ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, 2) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true) + err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), startWithFullSnapshot) Expect(err).ShouldNot(HaveOccurred()) - + //etcd.Server.Stop() etcd.Close() err = corruptEtcdDir() Expect(err).ShouldNot(HaveOccurred()) - store, err = snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) - Expect(err).ShouldNot(HaveOccurred()) baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - logger.Infoln(deltaSnapList.Len()) - logger.Infof("base snapshot is %v", baseSnapshot) + logger.Infof("No of delta snapshots: %d", deltaSnapList.Len()) + logger.Infof("Base snapshot is %v", baseSnapshot) rstr = NewRestorer(store, logger) restoreOptions := RestoreOptions{ @@ -320,40 +332,34 @@ var _ = Describe("Running Restorer", func() { Name: restoreName, MaxFetchers: maxFetchers, EmbeddedEtcdQuotaBytes: embeddedEtcdQuotaBytes, - //BaseSnapshot: *baseSnapshot, - DeltaSnapList: deltaSnapList, + DeltaSnapList: deltaSnapList, } - restoreOptions.BaseSnapshot.SnapDir = "" restoreOptions.BaseSnapshot.SnapName = "" err := rstr.Restore(restoreOptions) + Expect(err).ShouldNot(HaveOccurred()) err = checkDataConsistency(testCtx, restoreDataDir, logger) Expect(err).ShouldNot(HaveOccurred()) - }) }) Context("with no delta snapshots", func() { It("Should restore only full snapshot", func() { - deltaSnapshotPeriod := 0 + deltaSnapshotPeriod = time.Duration(0) + logger.Infoln("Starting snapshotter for no delta snapshots") wg.Add(1) - populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 1) - defer cancelPopulator() + populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 2*time.Second) go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, nil) - - logger.Infoln("Starting snapshotter for no delta snapshots") - ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, 1) + defer cancelPopulator() + ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, time.Second) + time.Sleep(2) // This is to make sure that err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true) Expect(err).ShouldNot(HaveOccurred()) - - etcd.Server.Stop() etcd.Close() err = corruptEtcdDir() Expect(err).ShouldNot(HaveOccurred()) - store, err = snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) - Expect(err).ShouldNot(HaveOccurred()) baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) @@ -374,6 +380,7 @@ var _ = Describe("Running Restorer", func() { DeltaSnapList: deltaSnapList, } err = rstr.Restore(RestoreOptions) + Expect(err).ShouldNot(HaveOccurred()) }) @@ -381,31 +388,25 @@ var _ = Describe("Running Restorer", func() { Context("with corrupted snapstore", func() { It("Should not restore and return error", func() { - deltaSnapshotPeriod := 1 + logger.Infoln("Starting snapshotter for corrupted snapstore") wg.Add(1) - populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 2) - defer cancelPopulator() + populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 2*time.Second) go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, nil) - ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, 1) - - logger.Infoln("Starting snapshotter for corrupted snapstore") + defer cancelPopulator() + ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, time.Second) err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true) Expect(err).ShouldNot(HaveOccurred()) - - etcd.Server.Stop() etcd.Close() err = corruptEtcdDir() Expect(err).ShouldNot(HaveOccurred()) - store, err = snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) - Expect(err).ShouldNot(HaveOccurred()) baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - logger.Infoln(deltaSnapList.Len()) + logger.Infof("No. of delta snapshots: %d", deltaSnapList.Len()) - snapshotToRemove := path.Join(snapstoreDir, deltaSnapList[deltaSnapList.Len()-1].SnapDir, deltaSnapList[deltaSnapList.Len()-1].SnapName) - logger.Infoln(snapshotToRemove) + snapshotToRemove := path.Join(snapstoreDir, baseSnapshot.SnapDir, baseSnapshot.SnapName) + logger.Infof("Snapshot to remove: %s", snapshotToRemove) err = os.Remove(snapshotToRemove) logger.Infof("Removed snapshot to cause corruption %s", snapshotToRemove) Expect(err).ShouldNot(HaveOccurred()) @@ -437,22 +438,16 @@ var _ = Describe("Running Restorer", func() { Context("with etcd data dir not cleaned up before restore", func() { It("Should fail to restore", func() { - deltaSnapshotPeriod := 1 + logger.Infoln("Starting snapshotter for not cleaned etcd dir scenario") wg.Add(1) - populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 2) - defer cancelPopulator() + populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 2*time.Second) go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, nil) - ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, 2) - - logger.Infoln("Starting snapshotter for not cleaned etcd dir scenario") + defer cancelPopulator() + ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, 2*time.Second) err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true) Expect(err).ShouldNot(HaveOccurred()) - - etcd.Server.Stop() etcd.Close() - store, err = snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) - Expect(err).ShouldNot(HaveOccurred()) baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) @@ -470,26 +465,24 @@ var _ = Describe("Running Restorer", func() { BaseSnapshot: *baseSnapshot, DeltaSnapList: deltaSnapList, } - logger.Infoln("starting restore restore directory exists already") + logger.Infoln("starting restore, restore directory exists already") err = rstr.Restore(RestoreOptions) logger.Infof("Failed to restore because :: %s", err) - Expect(err).Should(HaveOccurred()) + Expect(err).Should(HaveOccurred()) }) }) + //this test is excluded for now and is kept for reference purpose only // there needs to be some re-look done to validate the scenarios when a restore can happen on a running snapshot and accordingly include the test // as per current understanding the flow ensures it cannot happen but external intervention can not be ruled out as the command allows calling restore while snapshotting. XContext("while snapshotter is running ", func() { - fmt.Println("Testing restore while snapshotter is happening") It("Should stop snapshotter while restore is happening", func() { - - deltaSnapshotPeriod := 1 wg.Add(1) - populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 5) + populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 5*time.Second) defer cancelPopulator() go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, nil) - ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, 15) + ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, 15*time.Second) logger.Infoln("Starting snapshotter while loading is happening") err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true) @@ -546,7 +539,7 @@ func checkDataConsistency(ctx context.Context, dir string, logger *logrus.Entry) return fmt.Errorf("unable to start embedded etcd server: %v", err) } defer etcd.Close() - + endpoints := []string{etcd.Clients[0].Addr().String()} cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 10 * time.Second, @@ -563,15 +556,11 @@ func checkDataConsistency(ctx context.Context, dir string, logger *logrus.Entry) resValue string ) - opts := []clientv3.OpOption{ - clientv3.WithLimit(1), - } - for currKey := 0; currKey <= keyTo; currKey++ { key = utils.KeyPrefix + strconv.Itoa(currKey) value = utils.ValuePrefix + strconv.Itoa(currKey) - resp, err := cli.Get(testCtx, key, opts...) + resp, err := cli.Get(testCtx, key, clientv3.WithLimit(1)) if err != nil { return fmt.Errorf("unable to get value from etcd: %v", err) } diff --git a/pkg/snapshot/snapshotter/garbagecollector.go b/pkg/snapshot/snapshotter/garbagecollector.go index fc4e8d681..db354de26 100644 --- a/pkg/snapshot/snapshotter/garbagecollector.go +++ b/pkg/snapshot/snapshotter/garbagecollector.go @@ -27,8 +27,8 @@ import ( // RunGarbageCollector basically consider the older backups as garbage and deletes it func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) { - if ssr.config.garbageCollectionPeriodSeconds <= 0 { - ssr.logger.Infof("GC: Not running garbage collector since GarbageCollectionPeriodSeconds [%d] set to less than 1.", ssr.config.garbageCollectionPeriodSeconds) + if ssr.config.garbageCollectionPeriod <= time.Second { + ssr.logger.Infof("GC: Not running garbage collector since GarbageCollectionPeriod [%s] set to less than 1 second.", ssr.config.garbageCollectionPeriod) return } @@ -37,7 +37,7 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) { case <-stopCh: ssr.logger.Info("GC: Stop signal received. Closing garbage collector.") return - case <-time.After(ssr.config.garbageCollectionPeriodSeconds * time.Second): + case <-time.After(ssr.config.garbageCollectionPeriod): total := 0 ssr.logger.Info("GC: Executing garbage collection...") snapList, err := ssr.config.store.List() diff --git a/pkg/snapshot/snapshotter/snapshotter.go b/pkg/snapshot/snapshotter/snapshotter.go index e85c70962..98f0aeac6 100644 --- a/pkg/snapshot/snapshotter/snapshotter.go +++ b/pkg/snapshot/snapshotter/snapshotter.go @@ -38,7 +38,7 @@ import ( ) // NewSnapshotterConfig returns a config for the snapshotter. -func NewSnapshotterConfig(schedule string, store snapstore.SnapStore, maxBackups, deltaSnapshotIntervalSeconds, deltaSnapshotMemoryLimit int, etcdConnectionTimeout, garbageCollectionPeriodSeconds time.Duration, garbageCollectionPolicy string, tlsConfig *etcdutil.TLSConfig) (*Config, error) { +func NewSnapshotterConfig(schedule string, store snapstore.SnapStore, maxBackups, deltaSnapshotMemoryLimit int, deltaSnapshotInterval, etcdConnectionTimeout, garbageCollectionPeriod time.Duration, garbageCollectionPolicy string, tlsConfig *etcdutil.TLSConfig) (*Config, error) { logrus.Printf("Validating schedule...") sdl, err := cron.ParseStandard(schedule) if err != nil { @@ -49,8 +49,8 @@ func NewSnapshotterConfig(schedule string, store snapstore.SnapStore, maxBackups logrus.Infof("Found garbage collection policy: [%s], and maximum backup value %d less than 1. Setting it to default: %d ", GarbageCollectionPolicyLimitBased, maxBackups, DefaultMaxBackups) maxBackups = DefaultMaxBackups } - if deltaSnapshotIntervalSeconds < 1 { - logrus.Infof("Found delta snapshot interval %d second less than 1 second. Disabling delta snapshotting. ", deltaSnapshotIntervalSeconds) + if deltaSnapshotInterval < time.Second { + logrus.Infof("Found delta snapshot interval %s less than 1 second. Disabling delta snapshotting. ", deltaSnapshotInterval) } if deltaSnapshotMemoryLimit < 1 { logrus.Infof("Found delta snapshot memory limit %d bytes less than 1 byte. Setting it to default: %d ", deltaSnapshotMemoryLimit, DefaultDeltaSnapMemoryLimit) @@ -58,15 +58,15 @@ func NewSnapshotterConfig(schedule string, store snapstore.SnapStore, maxBackups } return &Config{ - schedule: sdl, - store: store, - deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds, - deltaSnapshotMemoryLimit: deltaSnapshotMemoryLimit, - etcdConnectionTimeout: etcdConnectionTimeout, - garbageCollectionPeriodSeconds: garbageCollectionPeriodSeconds, - garbageCollectionPolicy: garbageCollectionPolicy, - maxBackups: maxBackups, - tlsConfig: tlsConfig, + schedule: sdl, + store: store, + deltaSnapshotMemoryLimit: deltaSnapshotMemoryLimit, + deltaSnapshotInterval: deltaSnapshotInterval, + etcdConnectionTimeout: etcdConnectionTimeout, + garbageCollectionPeriod: garbageCollectionPeriod, + garbageCollectionPolicy: garbageCollectionPolicy, + maxBackups: maxBackups, + tlsConfig: tlsConfig, }, nil } @@ -125,10 +125,10 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}, startWithFullSnapshot bool) } } - ssr.deltaSnapshotTimer = time.NewTimer(time.Duration(DefaultDeltaSnapshotIntervalSeconds)) - if ssr.config.deltaSnapshotIntervalSeconds >= 1 { + ssr.deltaSnapshotTimer = time.NewTimer(DefaultDeltaSnapshotInterval) + if ssr.config.deltaSnapshotInterval >= time.Second { ssr.deltaSnapshotTimer.Stop() - ssr.deltaSnapshotTimer.Reset(time.Duration(ssr.config.deltaSnapshotIntervalSeconds)) + ssr.deltaSnapshotTimer.Reset(ssr.config.deltaSnapshotInterval) } return ssr.snapshotEventHandler(stopCh) @@ -213,7 +213,7 @@ func (ssr *Snapshotter) takeFullSnapshot() error { } } - ctx, cancel := context.WithTimeout(context.TODO(), ssr.config.etcdConnectionTimeout*time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), ssr.config.etcdConnectionTimeout) // Note: Although Get and snapshot call are not atomic, so revision number in snapshot file // may be ahead of the revision found from GET call. But currently this is the only workaround available // Refer: https://github.com/coreos/etcd/issues/9037 @@ -229,7 +229,7 @@ func (ssr *Snapshotter) takeFullSnapshot() error { if ssr.prevSnapshot.Kind == snapstore.SnapshotKindFull && ssr.prevSnapshot.LastRevision == lastRevision { ssr.logger.Infof("There are no updates since last snapshot, skipping full snapshot.") } else { - ctx, cancel = context.WithTimeout(context.TODO(), ssr.config.etcdConnectionTimeout*time.Second) + ctx, cancel = context.WithTimeout(context.TODO(), ssr.config.etcdConnectionTimeout) defer cancel() rc, err := client.Snapshot(ctx) if err != nil { @@ -260,7 +260,7 @@ func (ssr *Snapshotter) takeFullSnapshot() error { ssr.logger.Infof("Successfully saved full snapshot at: %s", path.Join(s.SnapDir, s.SnapName)) } - if ssr.config.deltaSnapshotIntervalSeconds < 1 { + if ssr.config.deltaSnapshotInterval < time.Second { // return without creating a watch on events return nil } @@ -288,12 +288,12 @@ func (ssr *Snapshotter) takeDeltaSnapshotAndResetTimer() error { } if ssr.deltaSnapshotTimer == nil { - ssr.deltaSnapshotTimer = time.NewTimer(time.Second * time.Duration(ssr.config.deltaSnapshotIntervalSeconds)) + ssr.deltaSnapshotTimer = time.NewTimer(ssr.config.deltaSnapshotInterval) } else { ssr.logger.Infof("Stopping delta snapshot...") ssr.deltaSnapshotTimer.Stop() - ssr.logger.Infof("Resetting delta snapshot to run after %d secs.", ssr.config.deltaSnapshotIntervalSeconds) - ssr.deltaSnapshotTimer.Reset(time.Second * time.Duration(ssr.config.deltaSnapshotIntervalSeconds)) + ssr.logger.Infof("Resetting delta snapshot to run after %s.", ssr.config.deltaSnapshotInterval.String()) + ssr.deltaSnapshotTimer.Reset(ssr.config.deltaSnapshotInterval) } return nil } @@ -349,7 +349,7 @@ func (ssr *Snapshotter) CollectEventsSincePrevSnapshot(stopCh <-chan struct{}) ( } } - ctx, cancel := context.WithTimeout(context.TODO(), ssr.config.etcdConnectionTimeout*time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), ssr.config.etcdConnectionTimeout) resp, err := client.Get(ctx, "", clientv3.WithLastRev()...) cancel() if err != nil { @@ -454,7 +454,7 @@ func (ssr *Snapshotter) snapshotEventHandler(stopCh <-chan struct{}) error { return err } case <-ssr.deltaSnapshotTimer.C: - if ssr.config.deltaSnapshotIntervalSeconds >= 1 { + if ssr.config.deltaSnapshotInterval >= time.Second { if err := ssr.takeDeltaSnapshotAndResetTimer(); err != nil { return err } diff --git a/pkg/snapshot/snapshotter/snapshotter_suite_test.go b/pkg/snapshot/snapshotter/snapshotter_suite_test.go index 13c859240..e4363c9d2 100644 --- a/pkg/snapshot/snapshotter/snapshotter_suite_test.go +++ b/pkg/snapshot/snapshotter/snapshotter_suite_test.go @@ -55,7 +55,6 @@ var _ = SynchronizedBeforeSuite(func() []byte { }, func(data []byte) {}) var _ = SynchronizedAfterSuite(func() {}, func() { - os.RemoveAll(outputDir) etcd.Server.Stop() etcd.Close() }) diff --git a/pkg/snapshot/snapshotter/snapshotter_test.go b/pkg/snapshot/snapshotter/snapshotter_test.go index c97a47379..9e8908736 100644 --- a/pkg/snapshot/snapshotter/snapshotter_test.go +++ b/pkg/snapshot/snapshotter/snapshotter_test.go @@ -33,26 +33,28 @@ import ( var _ = Describe("Snapshotter", func() { var ( - endpoints []string - store snapstore.SnapStore - etcdConnectionTimeout time.Duration - garbageCollectionPeriodSeconds time.Duration - maxBackups int - schedule string - certFile string - keyFile string - caFile string - insecureTransport bool - insecureSkipVerify bool - etcdUsername string - etcdPassword string - err error + endpoints []string + store snapstore.SnapStore + etcdConnectionTimeout time.Duration + garbageCollectionPeriod time.Duration + maxBackups int + schedule string + certFile string + keyFile string + caFile string + insecureTransport bool + insecureSkipVerify bool + etcdUsername string + etcdPassword string + err error ) BeforeEach(func() { - endpoints = []string{"http://localhost:2379"} - etcdConnectionTimeout = 10 - garbageCollectionPeriodSeconds = 30 + endpoints = []string{etcd.Clients[0].Addr().String()} + etcdConnectionTimeout = 10 * time.Second + garbageCollectionPeriod = 30 * time.Second schedule = "*/1 * * * *" + insecureTransport = true + insecureSkipVerify = true }) Describe("creating Snapshotter", func() { @@ -79,7 +81,7 @@ var _ = Describe("Snapshotter", func() { 10, DefaultDeltaSnapMemoryLimit, etcdConnectionTimeout, - garbageCollectionPeriodSeconds, + garbageCollectionPeriod, GarbageCollectionPolicyExponential, tlsConfig) Expect(err).Should(HaveOccurred()) @@ -105,7 +107,7 @@ var _ = Describe("Snapshotter", func() { 10, DefaultDeltaSnapMemoryLimit, etcdConnectionTimeout, - garbageCollectionPeriodSeconds, + garbageCollectionPeriod, GarbageCollectionPolicyExponential, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) @@ -116,9 +118,9 @@ var _ = Describe("Snapshotter", func() { Describe("running snapshotter", func() { Context("with etcd not running at configured endpoint", func() { It("should timeout & not take any snapshot", func() { - stopCh := make(chan struct{}) - endpoints = []string{"http://localhost:5000"} - etcdConnectionTimeout = 5 + validEndpoint := etcd.Clients[0].Addr().String() + endpoints = []string{validEndpoint[:len(validEndpoint)-2]} + etcdConnectionTimeout = 5 * time.Second maxBackups = 2 testTimeout := time.Duration(time.Minute * time.Duration(maxBackups+1)) store, err = snapstore.GetSnapstore(&snapstore.Config{Container: path.Join(outputDir, "snapshotter_2.bkp")}) @@ -139,7 +141,7 @@ var _ = Describe("Snapshotter", func() { 10, DefaultDeltaSnapMemoryLimit, etcdConnectionTimeout, - garbageCollectionPeriodSeconds, + garbageCollectionPeriod, GarbageCollectionPolicyExponential, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) @@ -148,11 +150,9 @@ var _ = Describe("Snapshotter", func() { logger, snapshotterConfig) - go func() { - <-time.After(testTimeout) - close(stopCh) - }() - err = ssr.Run(stopCh, true) + ctx, cancel := context.WithTimeout(testCtx, testTimeout) + defer cancel() + err = ssr.Run(ctx.Done(), true) Expect(err).Should(HaveOccurred()) list, err := store.List() Expect(err).ShouldNot(HaveOccurred()) @@ -162,15 +162,14 @@ var _ = Describe("Snapshotter", func() { Context("with etcd running at configured endpoint", func() { BeforeEach(func() { - endpoints = []string{"http://localhost:2379"} + endpoints = []string{etcd.Clients[0].Addr().String()} }) Context("with unreachable schedule", func() { var ssr *Snapshotter BeforeEach(func() { - stopCh := make(chan struct{}) schedule = "* * 31 2 *" - etcdConnectionTimeout = 5 + etcdConnectionTimeout = 5 * time.Second maxBackups = 2 testTimeout := time.Duration(time.Minute * time.Duration(maxBackups+1)) store, err = snapstore.GetSnapstore(&snapstore.Config{Container: path.Join(outputDir, "snapshotter_3.bkp")}) @@ -188,10 +187,10 @@ var _ = Describe("Snapshotter", func() { schedule, store, maxBackups, - 10, DefaultDeltaSnapMemoryLimit, + 10*time.Second, etcdConnectionTimeout, - garbageCollectionPeriodSeconds, + garbageCollectionPeriod, GarbageCollectionPolicyExponential, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) @@ -199,11 +198,9 @@ var _ = Describe("Snapshotter", func() { ssr = NewSnapshotter( logger, snapshotterConfig) - go func() { - <-time.After(testTimeout) - close(stopCh) - }() - err = ssr.Run(stopCh, true) + ctx, cancel := context.WithTimeout(testCtx, testTimeout) + defer cancel() + err = ssr.Run(ctx.Done(), true) Expect(err).Should(HaveOccurred()) }) @@ -222,28 +219,27 @@ var _ = Describe("Snapshotter", func() { Context("with valid schedule", func() { var ( - ssr *Snapshotter - schedule string - maxBackups int - testTimeout time.Duration - deltaSnapshotIntervalSeconds int + ssr *Snapshotter + schedule string + maxBackups int + testTimeout time.Duration + deltaSnapshotInterval time.Duration ) BeforeEach(func() { - endpoints = []string{"http://localhost:2379"} + endpoints = []string{etcd.Clients[0].Addr().String()} schedule = "*/1 * * * *" maxBackups = 2 // We will wait for maxBackups+1 times schedule period testTimeout = time.Duration(time.Minute * time.Duration(maxBackups+1)) - etcdConnectionTimeout = 5 + etcdConnectionTimeout = 5 * time.Second }) Context("with delta snapshot interval set to zero seconds", func() { BeforeEach(func() { - deltaSnapshotIntervalSeconds = 0 + deltaSnapshotInterval = 0 testTimeout = time.Duration(time.Minute * time.Duration(maxBackups)) }) It("should take periodic backups without delta snapshots", func() { - stopCh := make(chan struct{}) store, err = snapstore.GetSnapstore(&snapstore.Config{Container: path.Join(outputDir, "snapshotter_4.bkp")}) Expect(err).ShouldNot(HaveOccurred()) tlsConfig := etcdutil.NewTLSConfig( @@ -259,10 +255,10 @@ var _ = Describe("Snapshotter", func() { schedule, store, maxBackups, - deltaSnapshotIntervalSeconds, DefaultDeltaSnapMemoryLimit, + deltaSnapshotInterval, etcdConnectionTimeout, - garbageCollectionPeriodSeconds, + garbageCollectionPeriod, GarbageCollectionPolicyExponential, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) @@ -271,11 +267,9 @@ var _ = Describe("Snapshotter", func() { logger, snapshotterConfig) - go func() { - <-time.After(testTimeout) - close(stopCh) - }() - err = ssr.Run(stopCh, true) + ctx, cancel := context.WithTimeout(testCtx, testTimeout) + defer cancel() + err = ssr.Run(ctx.Done(), true) Expect(err).ShouldNot(HaveOccurred()) list, err := store.List() Expect(err).ShouldNot(HaveOccurred()) @@ -288,7 +282,8 @@ var _ = Describe("Snapshotter", func() { Context("with delta snapshots enabled", func() { BeforeEach(func() { - deltaSnapshotIntervalSeconds = 10 + deltaSnapshotInterval = 10 * time.Second + testTimeout = time.Duration(time.Minute * time.Duration(maxBackups+1)) }) Context("with snapshotter starting without first full snapshot", func() { @@ -308,10 +303,10 @@ var _ = Describe("Snapshotter", func() { schedule, store, maxBackups, - deltaSnapshotIntervalSeconds, DefaultDeltaSnapMemoryLimit, + deltaSnapshotInterval, etcdConnectionTimeout, - garbageCollectionPeriodSeconds, + garbageCollectionPeriod, GarbageCollectionPolicyExponential, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) @@ -352,10 +347,10 @@ var _ = Describe("Snapshotter", func() { schedule, store, maxBackups, - deltaSnapshotIntervalSeconds, DefaultDeltaSnapMemoryLimit, + deltaSnapshotInterval, etcdConnectionTimeout, - garbageCollectionPeriodSeconds, + garbageCollectionPeriod, GarbageCollectionPolicyExponential, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) @@ -389,12 +384,12 @@ var _ = Describe("Snapshotter", func() { testTimeout time.Duration ) BeforeEach(func() { - endpoints = []string{"http://localhost:2379"} + endpoints = []string{etcd.Clients[0].Addr().String()} schedule = "*/1 * * * *" maxBackups = 2 - garbageCollectionPeriodSeconds = 5 - testTimeout = time.Duration(time.Second * time.Duration(garbageCollectionPeriodSeconds*2)) - etcdConnectionTimeout = 5 + garbageCollectionPeriod = 5 * time.Second + testTimeout = garbageCollectionPeriod * 2 + etcdConnectionTimeout = 5 * time.Second }) It("should garbage collect exponentially", func() { @@ -511,10 +506,10 @@ var _ = Describe("Snapshotter", func() { schedule, store, maxBackups, - 10, DefaultDeltaSnapMemoryLimit, + 10*time.Second, etcdConnectionTimeout, - garbageCollectionPeriodSeconds, + garbageCollectionPeriod, GarbageCollectionPolicyExponential, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) @@ -557,10 +552,10 @@ var _ = Describe("Snapshotter", func() { schedule, store, maxBackups, - 10, DefaultDeltaSnapMemoryLimit, + 10*time.Second, etcdConnectionTimeout, - garbageCollectionPeriodSeconds, + garbageCollectionPeriod, GarbageCollectionPolicyLimitBased, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) diff --git a/pkg/snapshot/snapshotter/types.go b/pkg/snapshot/snapshotter/types.go index 84dfddaa1..3e27777c3 100644 --- a/pkg/snapshot/snapshotter/types.go +++ b/pkg/snapshot/snapshotter/types.go @@ -40,8 +40,8 @@ const ( SnapshotterActive State = 1 // DefaultDeltaSnapMemoryLimit is default memory limit for delta snapshots. DefaultDeltaSnapMemoryLimit = 10 * 1024 * 1024 //10Mib - // DefaultDeltaSnapshotIntervalSeconds is the default interval for delta snapshots in seconds. - DefaultDeltaSnapshotIntervalSeconds = 20 + // DefaultDeltaSnapshotInterval is the default interval for delta snapshots. + DefaultDeltaSnapshotInterval = 20 * time.Second ) var emptyStruct struct{} @@ -69,15 +69,15 @@ type Snapshotter struct { // Config stores the configuration parameters for the snapshotter. type Config struct { - schedule cron.Schedule - store snapstore.SnapStore - maxBackups int - deltaSnapshotIntervalSeconds int - deltaSnapshotMemoryLimit int - etcdConnectionTimeout time.Duration - garbageCollectionPeriodSeconds time.Duration - garbageCollectionPolicy string - tlsConfig *etcdutil.TLSConfig + schedule cron.Schedule + store snapstore.SnapStore + maxBackups int + deltaSnapshotMemoryLimit int + deltaSnapshotInterval time.Duration + etcdConnectionTimeout time.Duration + garbageCollectionPeriod time.Duration + garbageCollectionPolicy string + tlsConfig *etcdutil.TLSConfig } // event is wrapper over etcd event to keep track of time of event diff --git a/test/e2e/integration/cloud_backup_test.go b/test/e2e/integration/cloud_backup_test.go index c0c2cdb67..c349889e7 100644 --- a/test/e2e/integration/cloud_backup_test.go +++ b/test/e2e/integration/cloud_backup_test.go @@ -63,7 +63,7 @@ func startSnapshotter() (*Cmd, *chan error) { "snapshot", "--max-backups=1", "--garbage-collection-policy=LimitBased", - "--garbage-collection-period-seconds=30", + "--garbage-collection-period=30s", "--schedule=*/1 * * * *", "--storage-provider=S3", "--store-container=" + os.Getenv("TEST_ID"), @@ -92,8 +92,8 @@ func startBackupRestoreServer() (*Cmd, *chan error) { "--data-dir=" + os.Getenv("ETCD_DATA_DIR"), "--insecure-transport=true", "--garbage-collection-policy=LimitBased", - "--garbage-collection-period-seconds=30", - "--delta-snapshot-period-seconds=10", + "--garbage-collection-period=30s", + "--delta-snapshot-period=10s", "--schedule=*/1 * * * *", "--storage-provider=S3", "--store-container=" + os.Getenv("TEST_ID"), diff --git a/test/perf/regression/resources/etcd/pod.yaml b/test/perf/regression/resources/etcd/pod.yaml index dd71262ef..cef333c57 100644 --- a/test/perf/regression/resources/etcd/pod.yaml +++ b/test/perf/regression/resources/etcd/pod.yaml @@ -28,7 +28,7 @@ spec: initialDelaySeconds: 15 periodSeconds: 5 successThreshold: 1 - timeoutSeconds: 1 + timeoutSeconds: 1 ports: - containerPort: 2380 name: serverport @@ -71,11 +71,11 @@ spec: - --insecure-transport=true - --insecure-skip-tls-verify=true - --endpoints=http://localhost:2379 - - --etcd-connection-timeout=300 + - --etcd-connection-timeout=30s # Delta snapshot once in every 30s to ensure there are enough delta spanshots between full snapshots. - - --delta-snapshot-period-seconds=30 + - --delta-snapshot-period=30s - --delta-snapshot-memory-limit=10485760 - - --garbage-collection-period-seconds=43200 + - --garbage-collection-period=12h - --snapstore-temp-directory=/var/etcd/data/temp env: - name: GODEBUG diff --git a/test/utils/utils.go b/test/utils/utils.go index 782f61589..59c448dd6 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -35,7 +35,8 @@ const ( ValuePrefix = "val-" ) -// StartEmbeddedEtcd starts the embedded etcd for test purpose with minimal configuration. +// StartEmbeddedEtcd starts the embedded etcd for test purpose with minimal configuration at random port. +// To get the exact client endpoints it is listinging on, use returns etcd.Clients[0].Addr().String() func StartEmbeddedEtcd(ctx context.Context, etcdDir string, logger *logrus.Entry) (*embed.Etcd, error) { logger.Infoln("Starting embedded etcd...") cfg := embed.NewConfig() @@ -66,7 +67,7 @@ func StartEmbeddedEtcd(ctx context.Context, etcdDir string, logger *logrus.Entry defer cancel() select { case <-e.Server.ReadyNotify(): - logger.Infof("Embedded server is ready to listen client at: %s", e.Clients[0]) + logger.Infof("Embedded server is ready to listen client at: %s", e.Clients[0].Addr()) case <-etcdWaitCtx.Done(): e.Server.Stop() // trigger a shutdown e.Close() @@ -88,7 +89,7 @@ func PopulateEtcd(ctx context.Context, logger *logrus.Entry, endpoints []string, response = &EtcdDataPopulationResponse{} } response.KeyTo = keyFrom - 1 - logger.Infof("\n\nkeyFrom: %v, keyTo: %v", keyFrom, keyTo) + logger.Infof("keyFrom: %v, keyTo: %v", keyFrom, keyTo) cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 10 * time.Second, @@ -102,11 +103,12 @@ func PopulateEtcd(ctx context.Context, logger *logrus.Entry, endpoints []string, for { select { case <-ctx.Done(): - logger.Infof("Populated data till key %s into embedded etcd with etcd end revision :%v", KeyPrefix+strconv.Itoa(response.KeyTo), response.EndRevision) + logger.Infof("Populated data till key %s into embedded etcd with etcd end revision: %v", KeyPrefix+strconv.Itoa(response.KeyTo), response.EndRevision) return case <-time.After(time.Second): + response.KeyTo++ if response.KeyTo > keyTo { - logger.Infof("Populated data till key %s into embedded etcd with etcd end revision :%v", KeyPrefix+strconv.Itoa(response.KeyTo), response.EndRevision) + logger.Infof("Populated data till key %s into embedded etcd with etcd end revision: %v", KeyPrefix+strconv.Itoa(response.KeyTo), response.EndRevision) return } key := KeyPrefix + strconv.Itoa(response.KeyTo) @@ -114,10 +116,9 @@ func PopulateEtcd(ctx context.Context, logger *logrus.Entry, endpoints []string, resp, err := cli.Put(ctx, key, value) if err != nil { response.Err = fmt.Errorf("unable to put key-value pair (%s, %s) into embedded etcd: %v", key, value, err) - logger.Infof("Populated data till key %s into embedded etcd with etcd end revision :%v", KeyPrefix+strconv.Itoa(response.KeyTo), response.EndRevision) + logger.Infof("Populated data till key %s into embedded etcd with etcd end revision: %v", KeyPrefix+strconv.Itoa(response.KeyTo), response.EndRevision) return } - response.KeyTo++ response.EndRevision = resp.Header.GetRevision() //call a delete for every 10th Key after putting it in the store to check deletes in consistency check // handles deleted keys as every 10th key is deleted during populate etcd call @@ -128,7 +129,7 @@ func PopulateEtcd(ctx context.Context, logger *logrus.Entry, endpoints []string, resp, err := cli.Delete(ctx, key) if err != nil { response.Err = fmt.Errorf("unable to delete key (%s) from embedded etcd: %v", key, err) - logger.Infof("Populated data till key %s into embedded etcd with etcd end revision :%v", KeyPrefix+strconv.Itoa(response.KeyTo), response.EndRevision) + logger.Infof("Populated data till key %s into embedded etcd with etcd end revision: %v", KeyPrefix+strconv.Itoa(response.KeyTo), response.EndRevision) return } response.EndRevision = resp.Header.GetRevision()