diff --git a/cmd/server.go b/cmd/server.go index ab78caf1c..d2d4b8a46 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -123,6 +123,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command { ss, maxBackups, deltaSnapshotIntervalSeconds, + deltaSnapshotMemoryLimit, time.Duration(etcdConnectionTimeout), time.Duration(garbageCollectionPeriodSeconds), garbageCollectionPolicy, diff --git a/cmd/snapshot.go b/cmd/snapshot.go index c5e6e3f04..fc829468c 100644 --- a/cmd/snapshot.go +++ b/cmd/snapshot.go @@ -56,6 +56,7 @@ storing snapshots on various cloud storage providers as well as local disk locat ss, maxBackups, deltaSnapshotIntervalSeconds, + deltaSnapshotMemoryLimit, time.Duration(etcdConnectionTimeout), time.Duration(garbageCollectionPeriodSeconds), garbageCollectionPolicy, @@ -92,8 +93,9 @@ storing snapshots on various cloud storage providers as well as local disk locat func initializeSnapshotterFlags(cmd *cobra.Command) { cmd.Flags().StringSliceVarP(&etcdEndpoints, "endpoints", "e", []string{"127.0.0.1:2379"}, "comma separated list of etcd endpoints") cmd.Flags().StringVarP(&schedule, "schedule", "s", "* */1 * * *", "schedule for snapshots") - cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-period-seconds", "i", 10, "Period in seconds after which delta snapshot will be persisted") - cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", 7, "maximum number of previous backups to keep") + cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-period-seconds", "i", snapshotter.DefaultDeltaSnapshotIntervalSeconds, "Period in seconds after which delta snapshot will be persisted") + cmd.Flags().IntVar(&deltaSnapshotMemoryLimit, "delta-snapshot-memory-limit", snapshotter.DefaultDeltaSnapMemoryLimit, "memory limit after which delta snapshots will be taken") + cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", snapshotter.DefaultMaxBackups, "maximum number of previous backups to keep") cmd.Flags().IntVar(&etcdConnectionTimeout, "etcd-connection-timeout", 30, "etcd client connection timeout") cmd.Flags().IntVar(&garbageCollectionPeriodSeconds, "garbage-collection-period-seconds", 60, "Period in seconds for garbage collecting old backups") cmd.Flags().StringVar(&garbageCollectionPolicy, "garbage-collection-policy", snapshotter.GarbageCollectionPolicyExponential, "Policy for garbage collecting old backups") diff --git a/cmd/types.go b/cmd/types.go index 96b32f161..40a91b093 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -32,6 +32,7 @@ var ( schedule string etcdEndpoints []string deltaSnapshotIntervalSeconds int + deltaSnapshotMemoryLimit int maxBackups int etcdConnectionTimeout int garbageCollectionPeriodSeconds int diff --git a/pkg/snapshot/restorer/restorer_suite_test.go b/pkg/snapshot/restorer/restorer_suite_test.go index fab6a0ab3..2348f1409 100644 --- a/pkg/snapshot/restorer/restorer_suite_test.go +++ b/pkg/snapshot/restorer/restorer_suite_test.go @@ -157,6 +157,7 @@ func runSnapshotter(logger *logrus.Logger, endpoints []string, stopCh chan struc store, maxBackups, deltaSnapshotPeriod, + snapshotter.DefaultDeltaSnapMemoryLimit, etcdConnectionTimeout, garbageCollectionPeriodSeconds, garbageCollectionPolicy, diff --git a/pkg/snapshot/snapshotter/garbagecollector.go b/pkg/snapshot/snapshotter/garbagecollector.go index 89b8d2f25..e67b0ca48 100644 --- a/pkg/snapshot/snapshotter/garbagecollector.go +++ b/pkg/snapshot/snapshotter/garbagecollector.go @@ -24,6 +24,10 @@ import ( // RunGarbageCollector basically consider the older backups as garbage and deletes it func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) { + if ssr.config.garbageCollectionPeriodSeconds <= 0 { + ssr.logger.Infof("GC: Not running garbage collector since GarbageCollectionPeriodSeconds [%d] set to less than 1.", ssr.config.garbageCollectionPeriodSeconds) + return + } for { select { case <-stopCh: diff --git a/pkg/snapshot/snapshotter/snapshotter.go b/pkg/snapshot/snapshotter/snapshotter.go index 0d77809d1..80fb16f8a 100644 --- a/pkg/snapshot/snapshotter/snapshotter.go +++ b/pkg/snapshot/snapshotter/snapshotter.go @@ -33,19 +33,31 @@ import ( ) // NewSnapshotterConfig returns a config for the snapshotter. -func NewSnapshotterConfig(schedule string, store snapstore.SnapStore, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout, garbageCollectionPeriodSeconds time.Duration, garbageCollectionPolicy string, tlsConfig *etcdutil.TLSConfig) (*Config, error) { +func NewSnapshotterConfig(schedule string, store snapstore.SnapStore, maxBackups, deltaSnapshotIntervalSeconds, deltaSnapshotMemoryLimit int, etcdConnectionTimeout, garbageCollectionPeriodSeconds time.Duration, garbageCollectionPolicy string, tlsConfig *etcdutil.TLSConfig) (*Config, error) { logrus.Printf("Validating schedule...") sdl, err := cron.ParseStandard(schedule) if err != nil { return nil, fmt.Errorf("invalid schedule provied %s : %v", schedule, err) } - if maxBackups < 1 { - return nil, fmt.Errorf("maximum backups limit should be greater than zero. Input MaxBackups: %d", maxBackups) + + if garbageCollectionPolicy == GarbageCollectionPolicyLimitBased && maxBackups < 1 { + logrus.Infof("Found garbage collection policy: [%s], and maximum backup value %d less than 1. Setting it to default: %d ", GarbageCollectionPolicyLimitBased, maxBackups, DefaultMaxBackups) + maxBackups = DefaultMaxBackups + } + if deltaSnapshotIntervalSeconds < 1 { + logrus.Infof("Found delta snapshot interval %d second less than 1 second. Setting it to default: %d ", deltaSnapshotIntervalSeconds, DefaultDeltaSnapshotIntervalSeconds) + deltaSnapshotIntervalSeconds = DefaultDeltaSnapshotIntervalSeconds + } + if deltaSnapshotMemoryLimit < 1 { + logrus.Infof("Found delta snapshot memory limit %d bytes less than 1 byte. Setting it to default: %d ", deltaSnapshotMemoryLimit, DefaultDeltaSnapMemoryLimit) + deltaSnapshotMemoryLimit = DefaultDeltaSnapMemoryLimit } + return &Config{ schedule: sdl, store: store, deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds, + deltaSnapshotMemoryLimit: deltaSnapshotMemoryLimit, etcdConnectionTimeout: etcdConnectionTimeout, garbageCollectionPeriodSeconds: garbageCollectionPeriodSeconds, garbageCollectionPolicy: garbageCollectionPolicy, @@ -191,6 +203,7 @@ func (ssr *Snapshotter) takeFullSnapshot() error { ssr.logger.Infof("Successfully saved full snapshot at: %s", path.Join(s.SnapDir, s.SnapName)) //make event array empty as any event prior to full snapshot should not be uploaded in delta. ssr.events = []*event{} + ssr.eventMemory = 0 watchCtx, cancelWatch := context.WithCancel(context.TODO()) ssr.cancelWatch = cancelWatch ssr.watchCh = client.Watch(watchCtx, "", clientv3.WithPrefix(), clientv3.WithRev(ssr.prevSnapshot.LastRevision+1)) @@ -200,6 +213,13 @@ func (ssr *Snapshotter) takeFullSnapshot() error { } func (ssr *Snapshotter) takeDeltaSnapshotAndResetTimer() error { + if err := ssr.takeDeltaSnapshot(); err != nil { + // As per design principle, in business critical service if backup is not working, + // it's better to fail the process. So, we are quiting here. + ssr.logger.Warnf("Taking delta snapshot failed: %v", err) + return err + } + ssr.eventMemory = 0 if ssr.deltaSnapshotTimer == nil { ssr.deltaSnapshotTimer = time.NewTimer(time.Second * time.Duration(ssr.config.deltaSnapshotIntervalSeconds)) } else { @@ -208,12 +228,6 @@ func (ssr *Snapshotter) takeDeltaSnapshotAndResetTimer() error { ssr.logger.Infof("Reseting delta snapshot to run after %d secs.", ssr.config.deltaSnapshotIntervalSeconds) ssr.deltaSnapshotTimer.Reset(time.Second * time.Duration(ssr.config.deltaSnapshotIntervalSeconds)) } - if err := ssr.takeDeltaSnapshot(); err != nil { - // As per design principle, in business critical service if backup is not working, - // it's better to fail the process. So, we are quiting here. - ssr.logger.Warnf("Taking delta snapshot failed: %v", err) - return err - } return nil } @@ -234,10 +248,12 @@ func (ssr *Snapshotter) takeDeltaSnapshot() error { } // compute hash hash := sha256.New() - hash.Write(data) + if _, err := hash.Write(data); err != nil { + return fmt.Errorf("failed to compute hash of events: %v", err) + } data = hash.Sum(data) - dataReader := bytes.NewReader(data) + dataReader := bytes.NewReader(data) if err := ssr.config.store.Save(*snap, dataReader); err != nil { ssr.logger.Errorf("Error saving delta snapshots. %v", err) return err @@ -253,9 +269,19 @@ func (ssr *Snapshotter) handleDeltaWatchEvents(wr clientv3.WatchResponse) error } // aggregate events for _, ev := range wr.Events { - ssr.events = append(ssr.events, newEvent(ev)) + timedEvent := newEvent(ev) + jsonByte, err := json.Marshal(timedEvent) + if err != nil { + return fmt.Errorf("failed to marshal events to json: %v", err) + } + ssr.eventMemory = ssr.eventMemory + len(jsonByte) + ssr.events = append(ssr.events, timedEvent) } ssr.logger.Debugf("Added events till revision: %d", ssr.events[len(ssr.events)-1].EtcdEvent.Kv.ModRevision) + if ssr.eventMemory >= ssr.config.deltaSnapshotMemoryLimit { + ssr.logger.Infof("Delta events memory crossed the memory limit: %d Bytes", ssr.eventMemory) + return ssr.takeDeltaSnapshotAndResetTimer() + } return nil } diff --git a/pkg/snapshot/snapshotter/snapshotter_test.go b/pkg/snapshot/snapshotter/snapshotter_test.go index dacbe9bc2..0d20246e1 100644 --- a/pkg/snapshot/snapshotter/snapshotter_test.go +++ b/pkg/snapshot/snapshotter/snapshotter_test.go @@ -71,6 +71,7 @@ var _ = Describe("Snapshotter", func() { store, 1, 10, + DefaultDeltaSnapMemoryLimit, etcdConnectionTimeout, garbageCollectionPeriodSeconds, GarbageCollectionPolicyExponential, @@ -94,6 +95,7 @@ var _ = Describe("Snapshotter", func() { store, 1, 10, + DefaultDeltaSnapMemoryLimit, etcdConnectionTimeout, garbageCollectionPeriodSeconds, GarbageCollectionPolicyExponential, @@ -125,6 +127,7 @@ var _ = Describe("Snapshotter", func() { store, maxBackups, 10, + DefaultDeltaSnapMemoryLimit, etcdConnectionTimeout, garbageCollectionPeriodSeconds, GarbageCollectionPolicyExponential, @@ -174,6 +177,7 @@ var _ = Describe("Snapshotter", func() { store, maxBackups, 10, + DefaultDeltaSnapMemoryLimit, etcdConnectionTimeout, garbageCollectionPeriodSeconds, GarbageCollectionPolicyExponential, @@ -231,6 +235,7 @@ var _ = Describe("Snapshotter", func() { store, maxBackups, 10, + DefaultDeltaSnapMemoryLimit, etcdConnectionTimeout, garbageCollectionPeriodSeconds, GarbageCollectionPolicyExponential, @@ -376,6 +381,7 @@ var _ = Describe("Snapshotter", func() { store, maxBackups, 10, + DefaultDeltaSnapMemoryLimit, etcdConnectionTimeout, garbageCollectionPeriodSeconds, GarbageCollectionPolicyExponential, @@ -426,6 +432,7 @@ var _ = Describe("Snapshotter", func() { store, maxBackups, 10, + DefaultDeltaSnapMemoryLimit, etcdConnectionTimeout, garbageCollectionPeriodSeconds, GarbageCollectionPolicyLimitBased, diff --git a/pkg/snapshot/snapshotter/types.go b/pkg/snapshot/snapshotter/types.go index c62e9c257..24d80bbb1 100644 --- a/pkg/snapshot/snapshotter/types.go +++ b/pkg/snapshot/snapshotter/types.go @@ -31,20 +31,24 @@ const ( GarbageCollectionPolicyExponential = "Exponential" // GarbageCollectionPolicyLimitBased defines the limit based policy for garbage collecting old backups GarbageCollectionPolicyLimitBased = "LimitBased" -) - -var emptyStruct struct{} - -// State denotes the state the snapshotter would be in. -type State int + // DefaultMaxBackups is default number of maximum backups for limit based garbage collection policy. + DefaultMaxBackups = 7 -const ( // SnapshotterInactive is set when the snapshotter has not started taking snapshots. SnapshotterInactive State = 0 // SnapshotterActive is set when the snapshotter has started taking snapshots. SnapshotterActive State = 1 + // DefaultDeltaSnapMemoryLimit is default memory limit for delta snapshots. + DefaultDeltaSnapMemoryLimit = 10 * 1024 * 1024 //10Mib + // DefaultDeltaSnapshotIntervalSeconds is the default interval for delta snapshots in seconds. + DefaultDeltaSnapshotIntervalSeconds = 20 ) +var emptyStruct struct{} + +// State denotes the state the snapshotter would be in. +type State int + // Snapshotter is a struct for etcd snapshot taker type Snapshotter struct { logger *logrus.Logger @@ -58,6 +62,7 @@ type Snapshotter struct { cancelWatch context.CancelFunc SsrStateMutex *sync.Mutex SsrState State + eventMemory int } // Config stores the configuration parameters for the snapshotter. @@ -66,6 +71,7 @@ type Config struct { store snapstore.SnapStore maxBackups int deltaSnapshotIntervalSeconds int + deltaSnapshotMemoryLimit int etcdConnectionTimeout time.Duration garbageCollectionPeriodSeconds time.Duration garbageCollectionPolicy string